''' Created on 5 set 2019 @author: Emanuele Trabattoni ''' import sys import time import json import requests import xml.etree.ElementTree as xml import xml.dom.minidom as mini import dicttoxml, xmltodict import logging import copy import base64 import struct import csv from paho.mqtt import client from NasoScope import templates from paho.mqtt.client import connack_string def buildBody(): root = xml.Element("soap:Envelope", templates.XMLNS_ENV) body = xml.SubElement(root, "soap:Body") return root,body def getInfo(): root,body = buildBody() xml.SubElement(body, "GetHWKey", templates.COMMANDS["GetHWKey"]["attrib"]) resp = sendRequest(xml.tostring(root, encoding='utf-8')) if resp: return readResponse(resp) else: return False pass def setupOscope(): tempVarmap = list() for var in settings["variables"]: tempVar = copy.copy(templates.COMMANDS["MonAddVariable"]["elem"]) tempVar["VarDescr"]["DeviceID"] = var["deviceID"] tempVar["VarDescr"]["SamplingPeriod"] = settings["samplePeriod"] for cod in var["codes"]: root,body = buildBody() addVariable = xml.SubElement(body, "MonAddVariable", templates.COMMANDS["MonAddVariable"]["attrib"]) tempVar["VarDescr"]["Code"] = cod tempVarDescr = str(dicttoxml.dicttoxml(tempVar,attr_type=False), encoding='utf-8') tempxml = xml.fromstring(tempVarDescr) addVariable.append(tempxml.find("ChannelID")) addVariable.append(tempxml.find("VarDescr")) resp = sendRequest(xml.tostring(root, encoding='utf-8')) if resp: resp = readResponse(resp)["MonAddVariable-R"] if int(resp["retval"]): newvar=(var["name"],cod,int(resp["VariableID"])) tempVarmap.append(newvar) LOGGER.debug("Aggingo la variabile-> {}"\ .format(newvar)) pass else: LOGGER.error("Non posso aggiungere la variabile-> {}"\ .format(var)) else: return False return tempVarmap def openChannel(): root,body = buildBody() opench=xml.SubElement(body, "MonOpenChannel", templates.COMMANDS["MonOpenChannel"]["attrib"]) sync=xml.SubElement(opench, "Synchronized") sync.text = "false" resp = sendRequest(xml.tostring(root, encoding='utf-8')) if resp is not False: return readResponse(resp) else: return False pass def closeChannel(): root,body = buildBody() opench=xml.SubElement(body, "MonCloseChannel", templates.COMMANDS["MonCloseChannel"]["attrib"]) sync=xml.SubElement(opench, "Synchronized") sync.text = "false" resp = sendRequest(xml.tostring(root, encoding='utf-8')) if resp is not False: return readResponse(resp) else: return False pass def startSampling(channelID='0'): root,body = buildBody() startS = xml.SubElement(body, "MonStartSampling", templates.COMMANDS["MonStartSampling"]["attrib"]) chid = xml.SubElement(startS, "ChannelID") chid.text = channelID try: return readResponse(sendRequest(xml.tostring(root, encoding='utf-8'))) except Exception as e: LOGGER.error("Non riesco a iniziare la cattura -> {}".format(e)) def stopSampling(channelID='0'): root,body = buildBody() stopS = xml.SubElement(body, "MonStopSampling", templates.COMMANDS["MonStopSampling"]["attrib"]) chid = xml.SubElement(stopS, "ChannelID") chid.text = channelID try: return readResponse(sendRequest(xml.tostring(root, encoding='utf-8'))) except Exception as e: LOGGER.error("Non riesco a fermare la cattura -> {}".format(e)) def collectSamples(varMap, channelID='0'): dataDict = {} for var in varMap: dataDict[var[templates.VAR_NAME]] = dict() for var in varMap: try: root,body = buildBody() getVariable = xml.SubElement(body, "MonGetVariableS", templates.COMMANDS["MonGetVariableS"]["attrib"]) chid = xml.SubElement(getVariable, "ChannelID") chid.text = channelID varid = xml.SubElement(getVariable, "VariableID") varid.text = str(var[templates.VAR_ID]) resp = sendRequest(xml.tostring(root, encoding='utf-8')) if resp is not False: resp = readResponse(resp)["MonGetVariableS-R"] if int(resp["retval"]) >=1 : if len(resp["DataBuffer"]): dataBuffer_raw = base64.b64decode(resp["DataBuffer"]) timeBuffer_raw = base64.b64decode(resp["TimeBuffer"]) nVals = int(len(dataBuffer_raw)/8) dataStruct = struct.Struct("{}d".format(nVals)) timeStruct = struct.Struct("{}q".format(nVals)) dataVal = dataStruct.unpack(dataBuffer_raw) timeVal = timeStruct.unpack(timeBuffer_raw) dataDict[var[templates.VAR_NAME]][var[templates.VAR_REG]]=list(zip(timeVal,dataVal)) pass else: LOGGER.warning("Empty Buffer") return False pass else: return False except Exception as e: LOGGER.error("Eccezione inaspettata -> {}".format(e)) return False return dataDict pass def readResponse(r): responseData = r.find("SOAP-ENV:Body", templates.XMLNS_RESP) if responseData is not None: xmlstr = xml.tostring(responseData, encoding='utf-8') xmldict = xmltodict.parse(xmlstr, encoding='utf-8', process_namespaces=True) return remNameSpace(xmldict)["Body"] else: return False pass def remNameSpace(d, sep=':'): rv=dict() for k,v in d.items(): if isinstance(v, dict): rv[k.split(sep)[len(k.split(sep))-1]]=remNameSpace(v) else: rv[k.split(sep)[len(k.split(sep))-1]]=v return rv def sendRequest(req): req = str(req,encoding='utf-8') pprinted = mini.parseString(req) LOGGER.debug("XML_REQUEST\n"+pprinted.toprettyxml(indent='\t')) try: xmlResponse = requests.post(settings['URL'],templates.XML_HEADER+req,headers=templates.HTTP_HEADER) pprinted = mini.parseString(xmlResponse.text) LOGGER.debug("XML_RESPONSE\n"+pprinted.toprettyxml(indent='\t')) return xml.fromstring(xmlResponse.text) except requests.exceptions.RequestException as e: LOGGER.error("Timeout Richiesta HTTP -> ".format(str(e))) return False def saveSamples(s, fileName = None): if fileName is None: fileName = settings["fileSaveDir"].format(time.strftime("%Y%m%d-%H.%M.%S"), fType=settings["fileType"]) with open(fileName, mode = 'w', newline='') as dataOut: LOGGER.info("Salvo i campioni dei precedenti {}s -> {}".format(settings["fileSaveTime"],fileName)) if settings["fileType"] == "csv": h = ["{0}-{1}_TS;{0}-{1}".format(dev["name"],reg) for dev in settings["variables"] for reg in dev["descr"]] dataOut.write(";".join(h)+'\n') dataWriter = csv.writer(dataOut, delimiter=';', quoting=csv.QUOTE_NONE) dataWriter.writerows([[iii for ii in i for iii in ii] \ for i in zip(*[s[dev["name"]][code] \ for dev in settings["variables"] \ for code in dev["codes"]])]) elif settings["fileType"] == "json": dataOut.newline='\r\n' dataOut.write(json.dumps(s)) pass def onMessage(cli, userdata, msg): global stat msg=str(msg.payload) if "STOP" in msg: stat = "STOP" pass elif "PAUSE" in msg: pass elif "START" in msg: stat = "INI" pass elif "SAVE" in msg: saveSamples(samples) pass elif "EXIT" in msg: pass else: LOGGER.error("Messaggio MQTT sconosciuto") pass def onConnect(cli, userdata, flags, rc): LOGGER.debug("Connessione a MQTT -> {}".format(connack_string(rc))) cli.publish(settings["mqttSend"],"CLIENT_ON") pass def onDisconnect(cli, userdata, rc): LOGGER.warning("Disonnesso da MQTT -> {}".format(connack_string(rc))) cli.reconnect() pass ################################### ############# MAIN ################ ################################### if __name__ == '__main__': isRunning = True firstCap = False firstLoop = True stat = "IDLE" bufferFull = 0 variableMap = list() samples = dict() try: fp = open("config.json") settings = json.load(fp) fp.close() except: print("Non posso aprire il file di configurazione!") # Setup Logger LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) LOGGER.propagate = False FORMATTER = logging.Formatter((settings["logFormat"]), (settings["logTimeFormat"])) # File Logging fh = logging.FileHandler((settings["logFile"])) fh.setLevel(logging.WARNING) fh.setFormatter(FORMATTER) LOGGER.addHandler(fh) # Console Logging cl= logging.StreamHandler(sys.stdout) cl.setLevel(logging.INFO) cl.setFormatter(FORMATTER) LOGGER.addHandler(cl) LOGGER.warning("NasoScope {} Started!".format(settings["version"])) mqtt = client.Client(protocol=client.MQTTv31) mqtt.on_message = onMessage mqtt.on_connect = onConnect mqtt.on_disconnect = onDisconnect mqtt.connect(host=settings['mqttHost'], port=settings['mqttPort']) mqtt.subscribe(settings['mqttReceive']) mqtt.user_data_set(stat) mqtt.loop_start() xmlReqTempl = xml.fromstring(templates.REQ_TEMPLATE) xmlRespTempl = xml.fromstring(templates.RESP_TEMPLATE) while isRunning: if stat == "INI": r=getInfo() if r: LOGGER.info("Sto Monitorando-> HWid:{0}, rel:{1}"\ .format(r["GetHWKey-R"]["HwKey"],r["GetHWKey-R"]["Release"])) stopSampling() closeChannel() r=openChannel() channelID = r["MonOpenChannel-R"]["ChannelID"] LOGGER.info("Apro il Canale-> {}"\ .format(channelID)) variableMap=setupOscope() LOGGER.info("Aggiungo le Variabili-> {}"\ .format(variableMap)) stat = "STARTCAP" else: LOGGER.error('Nessuna risposta dal CN') isRunning = False pass elif stat == "STARTCAP": r=startSampling() if r: LOGGER.info("Inizio la cattura-> t:{}ms"\ .format(settings["samplePeriod"])) startTime = time.time() for k in settings["variables"]: samples[k["name"]] = {} for kk in k["codes"]: samples[k["name"]][kk] = [] firstCap = True stat = "CAP" else: LOGGER.error("Impossibile Iniziare la Cattura") isRunning = False pass elif stat == "CAP": tempSamples = collectSamples(variableMap,channelID) if tempSamples is not False: if firstCap == True: firstTS = min([z[templates.SAM_TS] for x in tempSamples \ for y in tempSamples[x] for z in tempSamples[x][y]]) firstCap = False pass else: for ax in tempSamples: for reg in tempSamples[ax]: for idx,rec in enumerate(tempSamples[ax][reg]): tempSamples[ax][reg][idx]=((tempSamples[ax][reg][idx][templates.SAM_TS]-firstTS)/10**6, tempSamples[ax][reg][idx][templates.SAM_VAL]) if tempSamples[ax][reg][idx][templates.SAM_TS] > settings["recordTime"]: samples[ax][reg] = samples[ax][reg][-(len(samples)-len(tempSamples)):] if bufferFull < 1: bufferFull = 1 elif bufferFull == 1: LOGGER.warning("Buffer Campioni Pieno, inizio roll") bufferFull = 2 else: pass pass pass samples[ax][reg]+=tempSamples[ax][reg] pass pass pass if time.time() - startTime > settings["fileSaveTime"]: saveSamples(samples) startTime = time.time() LOGGER.info("{}".format(tempSamples)) else: LOGGER.error("Disconnesso dal CN") stat = "STOP" pass elif stat == "STOP": stopSampling() closeChannel() saveSamples(samples) LOGGER.warning("Chiudo il Canale") stat = "IDLE" firstLoop = True pass elif stat == "IDLE" and firstLoop == True: LOGGER.info("Pronto a Iniziare Cattura") firstLoop = False else: pass time.sleep(settings["loopPeriod"]) mqtt.disconnect() sys.exit(0)