''' Created on 5 set 2019 @author: Emanuele Trabattoni ''' import sys import time import json import requests import xml.etree.ElementTree as xml import xml.dom.minidom as mini import dicttoxml, xmltodict import logging import copy import base64 import struct import csv from paho.mqtt import client from datetime import datetime import templates from paho.mqtt.client import connack_string from influxdb import InfluxDBClient def buildBody(): root = xml.Element("soap:Envelope", templates.XMLNS_ENV) body = xml.SubElement(root, "soap:Body") return root,body def getInfo(): root,body = buildBody() xml.SubElement(body, "GetHWKey", templates.COMMANDS["GetHWKey"]["attrib"]) resp = sendRequest(xml.tostring(root, encoding='utf-8')) if resp: return readResponse(resp) else: return False pass def setupOscope(): tempVarmap = list() for var in settings["variables"]: tempVar = copy.copy(templates.COMMANDS["MonAddVariable"]["elem"]) tempVar["VarDescr"]["DeviceID"] = var["deviceID"] tempVar["VarDescr"]["SamplingPeriod"] = settings["samplePeriod"] for cod in var["codes"]: root,body = buildBody() addVariable = xml.SubElement(body, "MonAddVariable", templates.COMMANDS["MonAddVariable"]["attrib"]) tempVar["VarDescr"]["Code"] = cod tempVarDescr = str(dicttoxml.dicttoxml(tempVar,attr_type=False), encoding='utf-8') tempxml = xml.fromstring(tempVarDescr) addVariable.append(tempxml.find("ChannelID")) addVariable.append(tempxml.find("VarDescr")) resp = sendRequest(xml.tostring(root, encoding='utf-8')) if resp: resp = readResponse(resp)["MonAddVariable-R"] if int(resp["retval"]): newvar=(var["name"],cod,int(resp["VariableID"])) tempVarmap.append(newvar) LOGGER.debug("Aggingo la variabile-> {}"\ .format(newvar)) pass else: LOGGER.error("Non posso aggiungere la variabile-> {}"\ .format(var)) else: return False return tempVarmap def openChannel(): root,body = buildBody() opench=xml.SubElement(body, "MonOpenChannel", templates.COMMANDS["MonOpenChannel"]["attrib"]) sync=xml.SubElement(opench, "Synchronized") sync.text = "false" resp = sendRequest(xml.tostring(root, encoding='utf-8')) if resp is not False: return readResponse(resp) else: return False pass def closeChannel(): root,body = buildBody() opench=xml.SubElement(body, "MonCloseChannel", templates.COMMANDS["MonCloseChannel"]["attrib"]) sync=xml.SubElement(opench, "Synchronized") sync.text = "false" resp = sendRequest(xml.tostring(root, encoding='utf-8')) if resp is not False: return readResponse(resp) else: return False pass def startSampling(channelID='0'): root,body = buildBody() startS = xml.SubElement(body, "MonStartSampling", templates.COMMANDS["MonStartSampling"]["attrib"]) chid = xml.SubElement(startS, "ChannelID") chid.text = channelID try: return readResponse(sendRequest(xml.tostring(root, encoding='utf-8'))) except Exception as e: LOGGER.error("Non riesco a iniziare la cattura -> {}".format(e)) def stopSampling(channelID='0'): root,body = buildBody() stopS = xml.SubElement(body, "MonStopSampling", templates.COMMANDS["MonStopSampling"]["attrib"]) chid = xml.SubElement(stopS, "ChannelID") chid.text = channelID try: return readResponse(sendRequest(xml.tostring(root, encoding='utf-8'))) except Exception as e: LOGGER.error("Non riesco a fermare la cattura -> {}".format(e)) def collectSamples(varMap, channelID='0'): dataDict = {} for var in varMap: dataDict[var[templates.VAR_NAME]] = dict() for var in varMap: try: root,body = buildBody() getVariable = xml.SubElement(body, "MonGetVariableS", templates.COMMANDS["MonGetVariableS"]["attrib"]) chid = xml.SubElement(getVariable, "ChannelID") chid.text = channelID varid = xml.SubElement(getVariable, "VariableID") varid.text = str(var[templates.VAR_ID]) resp = sendRequest(xml.tostring(root, encoding='utf-8')) if resp is not False: resp = readResponse(resp)["MonGetVariableS-R"] if int(resp["retval"]) >=1 : if len(resp["DataBuffer"]): dataBuffer_raw = base64.b64decode(resp["DataBuffer"]) timeBuffer_raw = base64.b64decode(resp["TimeBuffer"]) nVals = int(len(dataBuffer_raw)/8) dataStruct = struct.Struct("{}d".format(nVals)) timeStruct = struct.Struct("{}q".format(nVals)) dataVal = dataStruct.unpack(dataBuffer_raw) timeVal = timeStruct.unpack(timeBuffer_raw) dataDict[var[templates.VAR_NAME]][var[templates.VAR_REG]]=list(zip(timeVal,dataVal)) pass else: LOGGER.warning("Empty Buffer") return False pass else: return False except Exception as e: LOGGER.error("Eccezione inaspettata -> {}".format(e)) return False return dataDict pass def readResponse(r): responseData = r.find("SOAP-ENV:Body", templates.XMLNS_RESP) if responseData is not None: xmlstr = xml.tostring(responseData, encoding='utf-8') xmldict = xmltodict.parse(xmlstr, encoding='utf-8', process_namespaces=True) return remNameSpace(xmldict)["Body"] else: return False pass def remNameSpace(d, sep=':'): rv=dict() for k,v in d.items(): if isinstance(v, dict): rv[k.split(sep)[len(k.split(sep))-1]]=remNameSpace(v) else: rv[k.split(sep)[len(k.split(sep))-1]]=v return rv def sendRequest(req): req = str(req,encoding='utf-8') pprinted = mini.parseString(req) LOGGER.debug("XML_REQUEST\n"+pprinted.toprettyxml(indent='\t')) try: xmlResponse = requests.post(settings['URL'],templates.XML_HEADER+req,headers=templates.HTTP_HEADER) pprinted = mini.parseString(xmlResponse.text) LOGGER.debug("XML_RESPONSE\n"+pprinted.toprettyxml(indent='\t')) return xml.fromstring(xmlResponse.text) except requests.exceptions.RequestException as e: LOGGER.error("Timeout Richiesta HTTP -> ".format(str(e))) return False def saveSamples(s, fileName = None): if fileName is None: fileName = settings["fileSaveDir"].format(time.strftime("%Y%m%d-%H.%M.%S"), fType=settings["fileType"]) with open(fileName, mode = 'w', newline='') as dataOut: LOGGER.info("Salvo i campioni dei precedenti {}s -> {}".format(settings["fileSaveTime"],fileName)) if settings["fileType"] == "csv": h = ["{0}-{1}_TS;{0}-{1}".format(dev["name"],reg) for dev in settings["variables"] for reg in dev["descr"]] dataOut.write(";".join(h)+'\n') dataWriter = csv.writer(dataOut, delimiter=';', quoting=csv.QUOTE_NONE) dataWriter.writerows([[iii for ii in i for iii in ii] \ for i in zip(*[s[dev["name"]][code] \ for dev in settings["variables"] \ for code in dev["codes"]])]) elif settings["fileType"] == "json": dataOut.newline='\r\n' dataOut.write(json.dumps(s)) pass def sendSamples(s): mqtt.publish(settings['mqttSendSamples'],json.dumps(s)) def onMessage(cli, userdata, msg): global stat msg=str(msg.payload) if "STOP" in msg: stat = "STOP" pass elif "PAUSE" in msg: pass elif "START" in msg: stat = "INI" pass elif "SAVE" in msg: saveSamples(samples) pass elif "EXIT" in msg: pass else: LOGGER.error(f"Messaggio MQTT sconosciuto: {msg}") pass def onConnect(cli, userdata, flags, rc): LOGGER.debug("Connessione a MQTT -> {}".format(connack_string(rc))) cli.publish(settings["mqttSend"],"CLIENT_ON") pass def onDisconnect(cli, userdata, rc): LOGGER.warning("Disonnesso da MQTT -> {}".format(connack_string(rc))) cli.reconnect() pass ################################### ############# MAIN ################ ################################### if __name__ == '__main__': isRunning = True firstCap = False firstLoop = True stat = "IDLE" bufferFull = 0 loopcount = 0 variableMap = list() samples = dict() try: print(f'Config: {sys.argv[1]}') fp = open(str(sys.argv[1])) settings = json.load(fp) fp.close() except: print("Non posso aprire il file di configurazione!") # Setup Logger LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) LOGGER.propagate = False FORMATTER = logging.Formatter((settings["logFormat"]), (settings["logTimeFormat"])) # File Logging fh = logging.FileHandler((settings["logFile"])) fh.setLevel(logging.WARNING) fh.setFormatter(FORMATTER) LOGGER.addHandler(fh) # Console Logging cl= logging.StreamHandler(sys.stdout) cl.setLevel(logging.INFO) cl.setFormatter(FORMATTER) LOGGER.addHandler(cl) LOGGER.warning("NasoScope {} Started!".format(settings["version"])) mqtt = client.Client() mqtt.on_message = onMessage mqtt.on_connect = onConnect mqtt.on_disconnect = onDisconnect mqtt.connect(host=settings['mqttHost'], port=settings['mqttPort']) mqtt.subscribe(settings['mqttReceive']) mqtt.user_data_set(stat) mqtt.loop_start() influx = InfluxDBClient(host= settings['influxHost'], port=settings['influxPort'], database = settings['influxDB']) xmlReqTempl = xml.fromstring(templates.REQ_TEMPLATE) xmlRespTempl = xml.fromstring(templates.RESP_TEMPLATE) while isRunning: if stat == "INI": r=getInfo() if r: LOGGER.info("Sto Monitorando-> HWid:{0}, rel:{1}"\ .format(r["GetHWKey-R"]["HwKey"],r["GetHWKey-R"]["Release"])) stopSampling() closeChannel() r=openChannel() channelID = r["MonOpenChannel-R"]["ChannelID"] LOGGER.info("Apro il Canale-> {}"\ .format(channelID)) variableMap=setupOscope() LOGGER.info("Aggiungo le Variabili-> {}"\ .format(variableMap)) stat = "STARTCAP" else: LOGGER.error('Nessuna risposta dal CN') isRunning = False pass elif stat == "STARTCAP": r=startSampling() if r: LOGGER.info("Inizio la cattura-> t:{}ms"\ .format(settings["samplePeriod"])) startTime = time.time() for k in settings["variables"]: samples[k["name"]] = {} for kk in k["codes"]: samples[k["name"]][kk] = [] firstCap = True stat = "CAP" else: LOGGER.error("Impossibile Iniziare la Cattura") isRunning = False pass elif stat == "CAP": tempSamples = collectSamples(variableMap,channelID) if tempSamples is not False: if firstCap == True: # firstTS = min([z[templates.SAM_TS] for x in tempSamples \ # for y in tempSamples[x] for z in tempSamples[x][y]]) firstCap = False firstTS = time.time() LOGGER.warning(f'Inizio cattura: {datetime.fromtimestamp(firstTS).strftime("%c")}') pass else: for ax in tempSamples: for reg in tempSamples[ax]: if not settings['sendInflux']: for idx,rec in enumerate(tempSamples[ax][reg]): tempSamples[ax][reg][idx]=(tempSamples[ax][reg][idx][templates.SAM_TS], tempSamples[ax][reg][idx][templates.SAM_VAL]) if settings["sendMqtt"]: if tempSamples[ax][reg][idx][templates.SAM_TS] > settings["recordTime"]: samples[ax][reg] = samples[ax][reg][-(len(samples)-len(tempSamples)):] if bufferFull < 1: bufferFull = 1 elif bufferFull == 1: LOGGER.warning("Buffer Campioni Pieno, inizio roll") bufferFull = 2 else: pass pass pass samples[ax][reg]+=tempSamples[ax][reg] else: sendSamples(tempSamples) elif settings['sendInflux']: influxMeas = list() for idx,record in enumerate(tempSamples[ax][reg]): ts = tempSamples[ax][reg][idx][templates.SAM_TS]+firstTS*1000000 influxMeas.append({ "measurement" : ax, "tags": { "parameter": str(settings['variableNames'][str(reg)]) }, "time": int(ts), "fields":{ "value": tempSamples[ax][reg][idx][templates.SAM_VAL] } }) try: influx.write_points(points=influxMeas, time_precision='u', retention_policy=settings['influxRetention']) except Exception as e: LOGGER.error(f'Impossibile scrivere nel database: {e}') LOGGER.error(f'TS da controllo: {datetime.fromtimestamp(ts).strftime("%d/%m/%y_%H:%M:%S,%f")}') stat='STOP' else: pass pass pass pass if not settings['sendMqtt'] and not settings['sendInflux']: if time.time() - startTime > settings["fileSaveTime"]: saveSamples(samples) startTime = time.time() LOGGER.info("{}".format(tempSamples)) loopcount+=1 if loopcount % 100 == 0: LOGGER.info(f"Loop Numero, sono vivo: {loopcount}") else: LOGGER.error("Disconnesso dal CN") stat = "STOP" pass elif stat == "STOP": stopSampling() closeChannel() if not settings['sendMqtt']: saveSamples(samples) LOGGER.warning("Chiudo il Canale") firstLoop = True stat = "IDLE" pass elif stat == "IDLE": if firstLoop: LOGGER.info("Pronto a Iniziare Cattura") firstLoop = False else: pass time.sleep(settings["loopPeriod"]) mqtt.disconnect() sys.exit(0)