import os import sys import time import string import serial import logging import signal import json # Get environment variables env = dict(os.environ) LOGGER: logging.Logger class Runner: running: bool = True def __init__(self): signal.signal(signal.SIGINT, self.stop) signal.signal(signal.SIGTERM, self.stop) def stop(self, _): self.running = False def send(port: serial.Serial, d: str): port.write((d+'\r').encode('ascii')) port.flush() def receive(port: serial.Serial, d: str) -> str: r = port.read_until(b'\r').decode('ascii').rstrip() LOGGER.debug(f"{d} : {r}") return r def bruteforceCommands(port: serial.Serial): # T and S cause unwanted shutdown letters = string.ascii_uppercase.replace('T','').replace('S','') LOGGER.debug(f"Test commands: {letters}") for c in letters: send(port, c) receive(port, c) for n in range(10): d = c+f"{n:1d}" send(port, d) receive(port, d) for n in range(100): d = c+f"{n:02d}" send(port, d) receive(port, d) def main(): INTERVAL = int(env['INTERVAL']) run = Runner.running port = serial.Serial(port=env['PORT'], baudrate=int(env['BAUD']), bytesize=8, parity='N', stopbits=1) while run: try: send(port, "Q1") data = receive(port, "Q1").lstrip('(').split() if len(data) < 8: LOGGER.error(f"Incomplete data: {data}") continue values = { 'inV': float(data[0]), 'outV': float(data[2]), 'loadPercent': float(data[3]), 'lineFreq': float(data[4]), 'timeLeft': float(data[5]), 'onBatt': True if str(data[7]).startswith('1') else False, 'onLine': True if str(data[7]).endswith('1') else False, } LOGGER.debug(f"UPS Status: \n{json.dumps(values, indent=2)}") time.sleep(INTERVAL) except KeyboardInterrupt: run = False except Exception as e: print(f"Unexpected exception: [{e}]") return 1 port.close() if __name__ == "__main__": # Logger Constants LOG_FORMAT = '%(asctime)s| %(levelname)-7s|%(funcName)-10s|%(lineno)-3d: %(message)-50s' # Enabling Logger LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) LOGGER.propagate = False formatter = logging.Formatter(LOG_FORMAT, None) levels = logging.getLevelNamesMapping() # File logging log_name = os.path.abspath(env['LOG_FILE']) fh = logging.FileHandler(log_name) fh.setLevel(levels[env['LOG_FILE_LVL']]) fh.setFormatter(formatter) LOGGER.addHandler(fh) # Console logging cl = logging.StreamHandler(sys.stdout) cl.setLevel(levels[env['LOG_CLI_LVL']]) cl.setFormatter(formatter) LOGGER.addHandler(cl) LOGGER.warning(f"UPSmon started on: {time.asctime()}") while main(): LOGGER.error("Main thread exited unexpectedly") time.sleep(15) sys.exit(0)