Files
edelweiss/upsmon/ups.py

129 lines
4.0 KiB
Python

import os
import sys
import time
import string
import serial
import logging
import json
from pyutils.utils import *
from influxdb_client_3 import InfluxDBClient3
# Get environment variables
env = dict(os.environ)
LOGGER: logging.Logger
GIT_HASH: str = "HHHHHHHHH"
def send(port: serial.Serial, d: str):
port.write((d+'\r').encode('ascii'))
port.flush()
def receive(port: serial.Serial, d: str) -> str:
r = port.read_until(b'\r').decode('ascii').rstrip()
LOGGER.debug(f"{d} : {r}")
return r
def bruteforceCommands(port: serial.Serial):
# T and S cause unwanted shutdown
letters = string.ascii_uppercase.replace('T','').replace('S','')
LOGGER.debug(f"Test commands: {letters}")
for c in letters:
send(port, c)
receive(port, c)
for n in range(10):
d = c+f"{n:1d}"
send(port, d)
receive(port, d)
for n in range(100):
d = c+f"{n:02d}"
send(port, d)
receive(port, d)
##################
###### MAIN ######
##################
def main() -> int:
INTERVAL = int(env['INTERVAL'])
UPS_COMMAND = "Q1"
try:
# Init InfluxDB-v3 Client
write_client = InfluxDBClient3(host=env['INFLUXDB_URL'],
token=env['INFLUXDB_TOKEN'],
database=env['INFLUXDB_DATABASE'])
# Init Serial port
port = serial.Serial(port=env['PORT'], baudrate=int(env['BAUD']), bytesize=8, parity='N', stopbits=1)
except Exception as e:
LOGGER.error(e)
return 1
finally:
LOGGER.info(f"InfluxDB Connected: [{env['INFLUXDB_URL']}/{env['INFLUXDB_DATABASE']}]")
LOGGER.info(f"Serial Port Open: [{env['PORT']}]")
##############################
########## MAIN LOOP #########
##############################
run: SignalHandler = SignalHandler(LOGGER)
while run.running:
try:
send(port, UPS_COMMAND)
raw_data = receive(port, UPS_COMMAND).lstrip('(').rstrip().split()
if len(raw_data) < 8:
LOGGER.error(f"Incomplete data: {raw_data}")
continue
values = {
'inV': float(raw_data[0]),
'outV': float(raw_data[2]),
'loadPercent': float(raw_data[3]),
'lineFreq': float(raw_data[4]),
'timeLeft': float(raw_data[5]),
'onBatt': True if str(raw_data[7]).startswith('1') else False,
'onLine': True if str(raw_data[7]).endswith('1') else False,
}
LOGGER.debug(f"UPS Status: \n{json.dumps(values, indent=2)}")
if values['onBatt']:
LOGGER.info(f"OnBattery: \n{json.dumps(values,indent=2)}")
write_client.write(record=dict2Point('ups', values))
time.sleep(INTERVAL)
except Exception as e:
print(f"Unexpected exception: [{e}]")
return 1
##############################
###### END MAIN LOOP #########
##############################
port.close()
return 0
if __name__ == "__main__":
# Logger Constants
LOG_FORMAT = '%(asctime)s| %(levelname)-7s|%(funcName)-10s|%(lineno)-3d: %(message)-50s'
# Enabling Logger
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
LOGGER.propagate = False
formatter = logging.Formatter(LOG_FORMAT, None)
levels = logging.getLevelNamesMapping()
# File logging
log_name = os.path.abspath(env['LOG_FILE'])
fh = logging.FileHandler(log_name)
fh.setLevel(levels[env['LOG_FILE_LVL']])
fh.setFormatter(formatter)
LOGGER.addHandler(fh)
# Console logging
cl = logging.StreamHandler(sys.stdout)
cl.setLevel(levels[env['LOG_CLI_LVL']])
cl.setFormatter(formatter)
LOGGER.addHandler(cl)
LOGGER.warning(f"UPSmon started on: {time.asctime()}")
LOGGER.info(f"UPSmon BUILD: {env.get("VER", "Test")}")
while main():
LOGGER.error("Main thread exited unexpectedly")
time.sleep(15)
sys.exit(0)