import logging from subprocess import check_output, TimeoutExpired, CalledProcessError from json import dumps from sys import exit as sysexit, stdout from time import sleep from os import path import paho.mqtt.client as mqtt import paho.mqtt.enums as mqtt_enums # Logger options LOGFILE = '/mnt/logs/UPSmon.log' if not path.exists("/mnt/logs"): LOGFILE = '/tmp/UPSmon.log' #CONSOLE_DEBUG = logging.DEBUG CONSOLE_DEBUG = logging.INFO #FILE_DEBUG = logging.ERROR #FILE_DEBUG = logging.WARNING FILE_DEBUG = logging.INFO #FILE_DEBUG = logging.DEBUG LOG_FORMAT = ('%(asctime)s| %(levelname)-7s|%(funcName)-10s|%(lineno)-3d: %(message)-50s') LOG_TIME_FORMAT = ('%m-%d %H:%M:%S') LOGGER: logging.Logger # UPSmon Config CMD: list[str] = ['apcaccess', '-u'] T_LONG: float = 15.0 T_SHORT: float = 5.0 EXPORT_KEYS: list[str] = [ 'UPSNAME', 'STATUS', 'LINEV', 'LOADPCT', 'BCHARGE', 'TIMELEFT', 'ITEMP', 'BATTV', 'LINEFREQ' ] def clean_data(v: str) -> float | str: if v.isalpha(): return v return float(v) def main() -> int: i: int = 0 client: mqtt.Client = mqtt.Client(callback_api_version=mqtt_enums.CallbackAPIVersion.VERSION2) while client.connect(host='10.0.2.249', port=1883) != mqtt_enums.MQTTErrorCode.MQTT_ERR_SUCCESS: LOGGER.warning("MQTT Client retry connect") sleep(10.0) LOGGER.info("MQTT Client connected") client.loop_start() sleep(1.0) while client.is_connected(): try: rv: bytes = check_output(CMD, timeout=2.0) data_raw: list[list[str]] = [ l.split(':', 1) for l in rv.decode(encoding='ascii').splitlines() ] data = { k.strip().lower():clean_data(v.strip()) for k,v in data_raw if k.strip() in EXPORT_KEYS } LOGGER.debug(f"[{i}] {dumps(data, indent=2)}") tag: str = str(data.pop('upsname')) client.publish(topic='monitoring/ups/status', payload=dumps( obj=(data, {"upsname":tag}) )) sleep(T_LONG if data.get("status", "ONLINE") == "ONLINE" else T_SHORT) i += 1 except KeyboardInterrupt: LOGGER.warning("Stopping...") client.loop_stop() client.disconnect() return 0 except FileNotFoundError as f: LOGGER.error(f"{CMD} executable not found: {f}") return f.errno if f.errno else -1 except CalledProcessError as e: LOGGER.error(f"{CMD} failed with: {e}") return e.returncode except TimeoutExpired as t: LOGGER.warning(f"{CMD} timed out: {t}") return int(t.timeout) except Exception as ee: # default case catch all LOGGER.error(f"Unexpected Exception: {ee}") return -3 LOGGER.warning("MQTT Client Disconnected") return 1 if __name__ == "__main__": # Enabling Logger LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) LOGGER.propagate = False formatter = logging.Formatter(LOG_FORMAT,LOG_TIME_FORMAT) # File logging fh = logging.FileHandler(LOGFILE) fh.setLevel(FILE_DEBUG) fh.setFormatter(formatter) LOGGER.addHandler(fh) # Console logging cl = logging.StreamHandler(stdout) cl.setLevel(CONSOLE_DEBUG) cl.setFormatter(formatter) LOGGER.addHandler(cl) LOGGER.warning("UPSmon started") while main() != 0: LOGGER.warning("Restarting...") sleep(30.0) sysexit(0)