import os import sys import time import string import serial import logging import json from pyutils.utils import * from influxdb_client_3 import InfluxDBClient3 # Get environment variables env = dict(os.environ) LOGGER: logging.Logger GIT_HASH: str = "HHHHHHHHH" def send(port: serial.Serial, d: str): port.write((d+'\r').encode('ascii')) port.flush() def receive(port: serial.Serial, d: str) -> str: r = port.read_until(b'\r').decode('ascii').rstrip() LOGGER.debug(f"{d} : {r}") return r def bruteforceCommands(port: serial.Serial): # T and S cause unwanted shutdown letters = string.ascii_uppercase.replace('T','').replace('S','') LOGGER.debug(f"Test commands: {letters}") for c in letters: send(port, c) receive(port, c) for n in range(10): d = c+f"{n:1d}" send(port, d) receive(port, d) for n in range(100): d = c+f"{n:02d}" send(port, d) receive(port, d) ################## ###### MAIN ###### ################## def main(): INTERVAL = int(env['INTERVAL']) UPS_COMMAND = "Q1" try: # Init InfluxDB-v3 Client write_client = InfluxDBClient3(host=env['INFLUXDB_URL'], token=env['INFLUXDB_TOKEN'], database=env['INFLUXDB_DATABASE']) # Init Serial port port = serial.Serial(port=env['PORT'], baudrate=int(env['BAUD']), bytesize=8, parity='N', stopbits=1) except Exception as e: LOGGER.error(e) return 0 finally: LOGGER.info(f"InfluxDB Connected: [{env['INFLUXDB_URL']}/{env['INFLUXDB_DATABASE']}]") LOGGER.info(f"Serial Port Open: [{env['PORT']}]") ############################## ########## MAIN LOOP ######### ############################## run: SignalHandler = SignalHandler(LOGGER) while run.running: try: send(port, UPS_COMMAND) raw_data = receive(port, UPS_COMMAND).lstrip('(').split() if len(raw_data) < 8: LOGGER.error(f"Incomplete data: {raw_data}") continue values = { 'inV': float(raw_data[0]), 'outV': float(raw_data[2]), 'loadPercent': float(raw_data[3]), 'lineFreq': float(raw_data[4]), 'timeLeft': float(raw_data[5]), 'onBatt': True if str(raw_data[7]).startswith('1') else False, 'onLine': True if str(raw_data[7]).endswith('1') else False, } LOGGER.debug(f"UPS Status: \n{json.dumps(values, indent=2)}") if values['onBatt']: LOGGER.info(f"OnBattery\n{json.dumps(values,indent=2)}") write_client.write(record=dict2Point('ups', values)) time.sleep(INTERVAL) except Exception as e: print(f"Unexpected exception: [{e}]") return 1 ############################## ###### END MAIN LOOP ######### ############################## port.close() return 0 if __name__ == "__main__": # Logger Constants LOG_FORMAT = '%(asctime)s| %(levelname)-7s|%(funcName)-10s|%(lineno)-3d: %(message)-50s' # Enabling Logger LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) LOGGER.propagate = False formatter = logging.Formatter(LOG_FORMAT, None) levels = logging.getLevelNamesMapping() # File logging log_name = os.path.abspath(env['LOG_FILE']) fh = logging.FileHandler(log_name) fh.setLevel(levels[env['LOG_FILE_LVL']]) fh.setFormatter(formatter) LOGGER.addHandler(fh) # Console logging cl = logging.StreamHandler(sys.stdout) cl.setLevel(levels[env['LOG_CLI_LVL']]) cl.setFormatter(formatter) LOGGER.addHandler(cl) LOGGER.warning(f"UPSmon started on: {time.asctime()}") LOGGER.info(f"UPSmon BUILD: {env.get("VER", "Test")}") while main(): LOGGER.error("Main thread exited unexpectedly") time.sleep(15) sys.exit(0)