Files
edelweiss/upsmon/ups.py
2025-06-12 11:28:52 +02:00

179 lines
5.4 KiB
Python

import os
import sys
import time
import serial
import logging
from copy import deepcopy
from pyutils.utils import *
from dataclasses import dataclass
from influxdb_client_3 import InfluxDBClient3, WriteOptions
# Get environment variables
env = dict(os.environ)
LOGGER: logging.Logger
class UPScommand:
def send(self, port:serial.Serial, cmd: str):
port.write((cmd+'\r').encode('ascii'))
port.flush()
def receive(self, port:serial.Serial, cmd: str) -> str:
resp = port.read_until(expected=b'\r').decode('ascii').rstrip()
LOGGER.debug(f"{cmd} : {resp}")
return resp
def request(self, port:serial.Serial, cmd: str) -> list[str]:
self.send(port, cmd)
return self.receive(port, cmd).lstrip('(').rstrip().split()
def asDict(self):
return deepcopy(self.__dict__)
@dataclass
class UPSstatus(UPScommand):
inV: float
inF: float
outV: float
outF: float
current: float
loadPct: int
battV: float
temp: float
onLine: bool
onBatt: bool
ecoMode: bool
def __init__(self, port: serial.Serial):
self.update(port)
return
def update(self, port:serial.Serial):
data = self.request(port=port, cmd="QGS")
self.inV = float(data[0])
self.inF = float(data[1])
self.outV = float(data[2])
self.outF = float(data[3])
self.current = float(data[4])
self.loadPct = int(data[5])
self.battV = float(data[8])
self.temp = float(data[10])
self.onLine = True if data[11].startswith('1') else False
self.onBatt = True if data[11].endswith('0') else False
self.ecoMode = True if data[11][4] == '1' else False
@dataclass
class UPSbattery(UPScommand):
battV: float
battPct: int
timeLeft: float
def __init__(self, port: serial.Serial):
self.update(port)
return
def update(self, port: serial.Serial):
data = self.request(port=port, cmd="QBV")
self.battV = float(data[0])
self.battPct = int(data[3])
self.timeLeft = round(int(data[4]) / 60.0, 1)
##################
###### MAIN ######
##################
def main() -> int:
INTERVAL = int(env['INTERVAL'])
UPS_COMMAND = "Q1"
UPS_STATUS = "QGS" # (Vin Fin Vout Fout Aout Load% ?1 ?2 VbatInt VbatExt Temp Flags
UPS_BATTERY = "QBV" # (Vbat n1 n2 Charge% RunMins
UPS_MODE = "QMOD" # (Mode [Echo, Line, Battery?]
try:
# Init InfluxDB-v3 Client
write_client = InfluxDBClient3(host=env['INFLUXDB_URL'],
token=env['INFLUXDB_TOKEN'],
database=env['INFLUXDB_DATABASE'])
# Init Serial port
port = serial.Serial(port=env['PORT'], baudrate=int(env['BAUD']), bytesize=8, parity='N', stopbits=1)
port.flush()
# Init Dataclasses
run: SignalHandler = SignalHandler(LOGGER)
status = UPSstatus(port)
battery = UPSbattery(port)
except Exception as e:
LOGGER.error(e)
return 1
finally:
LOGGER.info(f"InfluxDB Connected: [{env['INFLUXDB_URL']}/{env['INFLUXDB_DATABASE']}]")
LOGGER.info(f"Serial Port Open: [{env['PORT']}]")
##############################
########## MAIN LOOP #########
##############################
while run.running:
try:
# Update data
status.update(port)
LOGGER.debug(f"{repr(status)}")
battery.update(port)
LOGGER.debug(f"{repr(battery)}")
# Debug status Information when running on batteries
if status.onBatt:
LOGGER.info(f" Status:\n{repr(status)}")
LOGGER.info(f"Battery:\n{repr(battery)}")
# Write datapoint to Influx merging measurements
datapoint = status.asDict() | battery.asDict()
write_client.write(record=dict2Point(measurement='ups',
fields=datapoint
))
LOGGER.debug(f"Influx Write: {datapoint}")
# Sleep and repeat
time.sleep(INTERVAL)
except Exception as e:
print(f"Unexpected exception: [{e}]")
return 1
##############################
###### END MAIN LOOP #########
##############################
port.close()
write_client.close()
LOGGER.warning("Main thread exited normally")
return 0
if __name__ == "__main__":
# Logger Constants
LOG_FORMAT = '%(asctime)s| %(levelname)-7s|%(funcName)-10s|%(lineno)-3d: %(message)-50s'
# Enabling Logger
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
LOGGER.propagate = False
formatter = logging.Formatter(LOG_FORMAT, None)
levels = logging.getLevelNamesMapping()
# File logging
log_name = os.path.abspath(env['LOG_FILE'])
fh = logging.FileHandler(log_name)
fh.setLevel(levels[env['LOG_FILE_LVL']])
fh.setFormatter(formatter)
LOGGER.addHandler(fh)
# Console logging
cl = logging.StreamHandler(sys.stdout)
cl.setLevel(levels[env['LOG_CLI_LVL']])
cl.setFormatter(formatter)
LOGGER.addHandler(cl)
LOGGER.warning(f"UPSmon started on: {time.asctime()}")
LOGGER.info(f"UPSmon BUILD: {env.get("VER", "Test")}")
while main():
LOGGER.error("Main thread exited unexpectedly")
time.sleep(15)
sys.exit(0)