dataclasses per upsmon

This commit is contained in:
2025-06-12 00:55:50 +02:00
parent 530755db90
commit 438523a811
2 changed files with 137 additions and 51 deletions

View File

@@ -1,44 +1,89 @@
import os import os
import sys import sys
import time import time
import string
import serial import serial
import logging import logging
import json
from pyutils.utils import *
from utils import *
from copy import deepcopy
from dataclasses import dataclass
from influxdb_client_3 import InfluxDBClient3 from influxdb_client_3 import InfluxDBClient3
# Get environment variables # Get environment variables
env = dict(os.environ) env = dict(os.environ)
LOGGER: logging.Logger LOGGER: logging.Logger
GIT_HASH: str = "HHHHHHHHH"
def send(port: serial.Serial, d: str): class UPScommand:
port.write((d+'\r').encode('ascii')) port: serial.Serial
port.flush()
def receive(port: serial.Serial, d: str) -> str: def __init__(self, p: serial.Serial):
# Expected response, 46chrs (231.5 230.8 229.2 012 50.0 2.27 27.0 00000001 self.port = p
r = port.read_until(expected=b'\r').decode('ascii').rstrip() return
LOGGER.debug(f"{d} : {r}")
return r
def bruteforceCommands(port: serial.Serial): def send(self, cmd: str):
# T and S cause unwanted shutdown self.port.write((cmd+'\r').encode('ascii'))
letters = string.ascii_uppercase.replace('T','').replace('S','') self.port.flush()
LOGGER.debug(f"Test commands: {letters}")
for c in letters: def receive(self, cmd: str) -> str:
send(port, c) resp = self.port.read_until(expected=b'\r').decode('ascii').rstrip()
receive(port, c) LOGGER.debug(f"{cmd} : {resp}")
for n in range(10): return resp
d = c+f"{n:1d}"
send(port, d) def request(self, cmd: str) -> list[str]:
receive(port, d) self.send(cmd)
for n in range(100): return self.receive(cmd).lstrip('(').rstrip().split()
d = c+f"{n:02d}"
send(port, d) def asDict(self):
receive(port, d) return deepcopy(self.__dict__)
@dataclass
class UPSstatus(UPScommand):
inV: float
inF: float
outV: float
outF: float
current: float
loadPct: int
battV: float
temp: float
onLine: bool
onBatt: bool
ecoMode: bool
def __init__(self, port: serial.Serial):
self.port = port
self.update()
def update(self):
data = self.request(cmd="QGS")
self.inV = float(data[0])
self.inF = float(data[1])
self.outV = float(data[1])
self.outF = float(data[3])
self.current = float(data[4])
self.loadPct = int(data[5])
self.battV = float(data[8])
self.temp = float(data[10])
self.onLine = True if data[11].startswith('1') else False
self.onBatt = True if data[11].endswith('0') else False
self.ecoMode = True if data[11][4] == '1' else False
@dataclass
class UPSbattery(UPScommand):
battV: float
battPct: int
timeLeft: float
def __init__(self, port: serial.Serial):
self.port = port
self.update()
def update(self):
data = self.request(cmd="QBV")
self.battV = float(data[0])
self.battPct = int(data[3])
self.timeLeft = int(data[4]) / 60.0
################## ##################
###### MAIN ###### ###### MAIN ######
@@ -58,6 +103,11 @@ def main() -> int:
# Init Serial port # Init Serial port
port = serial.Serial(port=env['PORT'], baudrate=int(env['BAUD']), bytesize=8, parity='N', stopbits=1) port = serial.Serial(port=env['PORT'], baudrate=int(env['BAUD']), bytesize=8, parity='N', stopbits=1)
port.flush() port.flush()
# Init Dataclasses
run: SignalHandler = SignalHandler(LOGGER)
status = UPSstatus(port=port)
battery = UPSbattery(port=port)
except Exception as e: except Exception as e:
LOGGER.error(e) LOGGER.error(e)
return 1 return 1
@@ -68,27 +118,26 @@ def main() -> int:
############################## ##############################
########## MAIN LOOP ######### ########## MAIN LOOP #########
############################## ##############################
run: SignalHandler = SignalHandler(LOGGER)
while run.running: while run.running:
try: try:
send(port, UPS_COMMAND) # Update data
raw_data = receive(port, UPS_COMMAND).lstrip('(').rstrip().split() status.update()
if len(raw_data) < 8: LOGGER.debug(f"{repr(status)}")
LOGGER.error(f"Incomplete data: {raw_data}") battery.update()
break LOGGER.debug(f"{repr(battery)}")
values = {
'inV': float(raw_data[0]), # Debug status Information when running on batteries
'outV': float(raw_data[2]), if status.onBatt:
'loadPercent': float(raw_data[3]), LOGGER.info(f" Status:\n{repr(status)}")
'lineFreq': float(raw_data[4]), LOGGER.info(f"Battery:\n{repr(battery)}")
'timeLeft': float(raw_data[5]),
'onBatt': True if str(raw_data[7]).startswith('1') else False, # Write datapoint to Influx merging measurements
'onLine': True if str(raw_data[7]).endswith('1') else False, datapoint = status.asDict() | battery.asDict()
} datapoint.pop('port')
LOGGER.debug(f"UPS Status: \n{json.dumps(values, indent=2)}") write_client.write(record=dict2Point(measurement='ups',
if values['onBatt']: fields=datapoint
LOGGER.info(f"OnBattery: \n{json.dumps(values,indent=2)}") ))
write_client.write(record=dict2Point('ups', values)) # Sleep and repeat
time.sleep(INTERVAL) time.sleep(INTERVAL)
except Exception as e: except Exception as e:
print(f"Unexpected exception: [{e}]") print(f"Unexpected exception: [{e}]")

37
upsmon/utils.py Normal file
View File

@@ -0,0 +1,37 @@
import signal
from logging import Logger
from influxdb_client_3 import Point
class SignalHandler:
running: bool
logger: Logger
def __init__(self, logger):
self.running: bool = True
self.logger: Logger = logger
signal.signal(signal.SIGINT, self._handle_sigint)
signal.signal(signal.SIGTERM, self._handle_sigint)
def _handle_sigint(self, signum, frame):
self.logger.info(f"Received SIGNAL: {signal.strsignal(signum)}")
self.running = False
def dict2Point(measurement: str, fields: dict, tags: dict | None = None) -> Point:
p = Point(measurement)
for k,v in fields.items():
p.field(k,v)
if tags:
for k,v in tags.items():
p.tag(k,v)
return p
def convertInt(d: dict) -> dict:
for k,v in d.items():
if str.isdecimal(v):
d[k] = int(v)
return d
def convertIntList(l: list[dict]) -> list[dict]:
for n,d in enumerate(l):
l[n] = convertInt(d)
return l