Fix ownership seriale

This commit is contained in:
2025-06-12 01:11:41 +02:00
parent 438523a811
commit d37fbb64a1

View File

@@ -15,24 +15,19 @@ env = dict(os.environ)
LOGGER: logging.Logger
class UPScommand:
port: serial.Serial
def __init__(self, p: serial.Serial):
self.port = p
return
def send(self, cmd: str):
self.port.write((cmd+'\r').encode('ascii'))
self.port.flush()
def send(self, port:serial.Serial, cmd: str):
port.write((cmd+'\r').encode('ascii'))
port.flush()
def receive(self, cmd: str) -> str:
resp = self.port.read_until(expected=b'\r').decode('ascii').rstrip()
def receive(self, port:serial.Serial, cmd: str) -> str:
resp = port.read_until(expected=b'\r').decode('ascii').rstrip()
LOGGER.debug(f"{cmd} : {resp}")
return resp
def request(self, cmd: str) -> list[str]:
self.send(cmd)
return self.receive(cmd).lstrip('(').rstrip().split()
def request(self, port:serial.Serial, cmd: str) -> list[str]:
self.send(port, cmd)
return self.receive(port, cmd).lstrip('(').rstrip().split()
def asDict(self):
return deepcopy(self.__dict__)
@@ -52,14 +47,14 @@ class UPSstatus(UPScommand):
ecoMode: bool
def __init__(self, port: serial.Serial):
self.port = port
self.update()
self.update(port)
return
def update(self):
data = self.request(cmd="QGS")
def update(self, port:serial.Serial):
data = self.request(port=port, cmd="QGS")
self.inV = float(data[0])
self.inF = float(data[1])
self.outV = float(data[1])
self.outV = float(data[2])
self.outF = float(data[3])
self.current = float(data[4])
self.loadPct = int(data[5])
@@ -74,16 +69,16 @@ class UPSbattery(UPScommand):
battV: float
battPct: int
timeLeft: float
def __init__(self, port: serial.Serial):
self.port = port
self.update()
self.update(port)
return
def update(self):
data = self.request(cmd="QBV")
def update(self, port: serial.Serial):
data = self.request(port=port, cmd="QBV")
self.battV = float(data[0])
self.battPct = int(data[3])
self.timeLeft = int(data[4]) / 60.0
self.timeLeft = round(int(data[4]) / 60.0, 2)
##################
###### MAIN ######
@@ -106,8 +101,8 @@ def main() -> int:
# Init Dataclasses
run: SignalHandler = SignalHandler(LOGGER)
status = UPSstatus(port=port)
battery = UPSbattery(port=port)
status = UPSstatus(port)
battery = UPSbattery(port)
except Exception as e:
LOGGER.error(e)
return 1
@@ -121,9 +116,9 @@ def main() -> int:
while run.running:
try:
# Update data
status.update()
status.update(port)
LOGGER.debug(f"{repr(status)}")
battery.update()
battery.update(port)
LOGGER.debug(f"{repr(battery)}")
# Debug status Information when running on batteries
@@ -133,10 +128,10 @@ def main() -> int:
# Write datapoint to Influx merging measurements
datapoint = status.asDict() | battery.asDict()
datapoint.pop('port')
write_client.write(record=dict2Point(measurement='ups',
fields=datapoint
))
LOGGER.debug(f"Influx Write: {datapoint}")
# Sleep and repeat
time.sleep(INTERVAL)
except Exception as e: