import os import sys import time import serial import logging from copy import deepcopy from pyutils.utils import * from dataclasses import dataclass from influxdb_client_3 import InfluxDBClient3, WriteOptions # Get environment variables env = dict(os.environ) LOGGER: logging.Logger class UPScommand: def send(self, port:serial.Serial, cmd: str): port.write((cmd+'\r').encode('ascii')) port.flush() def receive(self, port:serial.Serial, cmd: str) -> str: resp = port.read_until(expected=b'\r').decode('ascii').rstrip() LOGGER.debug(f"{cmd} : {resp}") return resp def request(self, port:serial.Serial, cmd: str) -> list[str]: self.send(port, cmd) return self.receive(port, cmd).lstrip('(').rstrip().split() def asDict(self): return deepcopy(self.__dict__) @dataclass class UPSstatus(UPScommand): inV: float inF: float outV: float outF: float current: float loadPct: int battV: float temp: float onLine: bool onBatt: bool ecoMode: bool def __init__(self, port: serial.Serial): self.update(port) return def update(self, port:serial.Serial): data = self.request(port=port, cmd="QGS") self.inV = float(data[0]) self.inF = float(data[1]) self.outV = float(data[2]) self.outF = float(data[3]) self.current = float(data[4]) self.loadPct = int(data[5]) self.battV = float(data[8]) self.temp = float(data[10]) self.onLine = True if data[11].startswith('1') else False self.onBatt = True if data[11].endswith('0') else False self.ecoMode = True if data[11][4] == '1' else False @dataclass class UPSbattery(UPScommand): battV: float battPct: int timeLeft: float def __init__(self, port: serial.Serial): self.update(port) return def update(self, port: serial.Serial): data = self.request(port=port, cmd="QBV") self.battV = float(data[0]) self.battPct = int(data[3]) self.timeLeft = round(int(data[4]) / 60.0, 1) ################## ###### MAIN ###### ################## def main() -> int: INTERVAL = int(env['INTERVAL']) UPS_COMMAND = "Q1" UPS_STATUS = "QGS" # (Vin Fin Vout Fout Aout Load% ?1 ?2 VbatInt VbatExt Temp Flags UPS_BATTERY = "QBV" # (Vbat n1 n2 Charge% RunMins UPS_MODE = "QMOD" # (Mode [Echo, Line, Battery?] try: # Init InfluxDB-v3 Client write_client = InfluxDBClient3(host=env['INFLUXDB_URL'], token=env['INFLUXDB_TOKEN'], database=env['INFLUXDB_DATABASE']) # Init Serial port port = serial.Serial(port=env['PORT'], baudrate=int(env['BAUD']), bytesize=8, parity='N', stopbits=1) port.flush() # Init Dataclasses run: SignalHandler = SignalHandler(LOGGER) status = UPSstatus(port) battery = UPSbattery(port) except Exception as e: LOGGER.error(e) return 1 finally: LOGGER.info(f"InfluxDB Connected: [{env['INFLUXDB_URL']}/{env['INFLUXDB_DATABASE']}]") LOGGER.info(f"Serial Port Open: [{env['PORT']}]") ############################## ########## MAIN LOOP ######### ############################## while run.running: try: # Update data status.update(port) LOGGER.debug(f"{repr(status)}") battery.update(port) LOGGER.debug(f"{repr(battery)}") # Debug status Information when running on batteries if status.onBatt: LOGGER.info(f" Status:\n{repr(status)}") LOGGER.info(f"Battery:\n{repr(battery)}") # Write datapoint to Influx merging measurements datapoint = status.asDict() | battery.asDict() write_client.write(record=dict2Point(measurement='ups', fields=datapoint )) LOGGER.debug(f"Influx Write: {datapoint}") # Sleep and repeat time.sleep(INTERVAL) except Exception as e: print(f"Unexpected exception: [{e}]") return 1 ############################## ###### END MAIN LOOP ######### ############################## port.close() write_client.close() LOGGER.warning("Main thread exited normally") return 0 if __name__ == "__main__": # Logger Constants LOG_FORMAT = '%(asctime)s| %(levelname)-7s|%(funcName)-10s|%(lineno)-3d: %(message)-50s' # Enabling Logger LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) LOGGER.propagate = False formatter = logging.Formatter(LOG_FORMAT, None) levels = logging.getLevelNamesMapping() # File logging log_name = os.path.abspath(env['LOG_FILE']) fh = logging.FileHandler(log_name) fh.setLevel(levels[env['LOG_FILE_LVL']]) fh.setFormatter(formatter) LOGGER.addHandler(fh) # Console logging cl = logging.StreamHandler(sys.stdout) cl.setLevel(levels[env['LOG_CLI_LVL']]) cl.setFormatter(formatter) LOGGER.addHandler(cl) LOGGER.warning(f"UPSmon started on: {time.asctime()}") LOGGER.info(f"UPSmon BUILD: {env.get("VER", "Test")}") while main(): LOGGER.error("Main thread exited unexpectedly") time.sleep(15) sys.exit(0)