Files
NasoScopio/NasoScope/nasomain.py

412 lines
16 KiB
Python

'''
Created on 5 set 2019
@author: Emanuele Trabattoni
'''
import sys
import time
import json
import requests
import xml.etree.ElementTree as xml
import xml.dom.minidom as mini
import dicttoxml, xmltodict
import logging
import copy
import base64
import struct
import csv
from paho.mqtt import client
from datetime import datetime
import templates
from paho.mqtt.client import connack_string
from influxdb import InfluxDBClient
def buildBody():
root = xml.Element("soap:Envelope", templates.XMLNS_ENV)
body = xml.SubElement(root, "soap:Body")
return root,body
def getInfo():
root,body = buildBody()
xml.SubElement(body, "GetHWKey", templates.COMMANDS["GetHWKey"]["attrib"])
resp = sendRequest(xml.tostring(root, encoding='utf-8'))
if resp:
return readResponse(resp)
else:
return False
pass
def setupOscope():
tempVarmap = list()
for var in settings["variables"]:
tempVar = copy.copy(templates.COMMANDS["MonAddVariable"]["elem"])
tempVar["VarDescr"]["DeviceID"] = var["deviceID"]
tempVar["VarDescr"]["SamplingPeriod"] = settings["samplePeriod"]
for cod in var["codes"]:
root,body = buildBody()
addVariable = xml.SubElement(body, "MonAddVariable", templates.COMMANDS["MonAddVariable"]["attrib"])
tempVar["VarDescr"]["Code"] = cod
tempVarDescr = str(dicttoxml.dicttoxml(tempVar,attr_type=False), encoding='utf-8')
tempxml = xml.fromstring(tempVarDescr)
addVariable.append(tempxml.find("ChannelID"))
addVariable.append(tempxml.find("VarDescr"))
resp = sendRequest(xml.tostring(root, encoding='utf-8'))
if resp:
resp = readResponse(resp)["MonAddVariable-R"]
if int(resp["retval"]):
newvar=(var["name"],cod,int(resp["VariableID"]))
tempVarmap.append(newvar)
LOGGER.debug("Aggingo la variabile-> {}"\
.format(newvar))
pass
else:
LOGGER.error("Non posso aggiungere la variabile-> {}"\
.format(var))
else:
return False
return tempVarmap
def openChannel():
root,body = buildBody()
opench=xml.SubElement(body, "MonOpenChannel", templates.COMMANDS["MonOpenChannel"]["attrib"])
sync=xml.SubElement(opench, "Synchronized")
sync.text = "false"
resp = sendRequest(xml.tostring(root, encoding='utf-8'))
if resp is not False:
return readResponse(resp)
else:
return False
pass
def closeChannel():
root,body = buildBody()
opench=xml.SubElement(body, "MonCloseChannel", templates.COMMANDS["MonCloseChannel"]["attrib"])
sync=xml.SubElement(opench, "Synchronized")
sync.text = "false"
resp = sendRequest(xml.tostring(root, encoding='utf-8'))
if resp is not False:
return readResponse(resp)
else:
return False
pass
def startSampling(channelID='0'):
root,body = buildBody()
startS = xml.SubElement(body, "MonStartSampling", templates.COMMANDS["MonStartSampling"]["attrib"])
chid = xml.SubElement(startS, "ChannelID")
chid.text = channelID
try:
return readResponse(sendRequest(xml.tostring(root, encoding='utf-8')))
except Exception as e:
LOGGER.error("Non riesco a iniziare la cattura -> {}".format(e))
def stopSampling(channelID='0'):
root,body = buildBody()
stopS = xml.SubElement(body, "MonStopSampling", templates.COMMANDS["MonStopSampling"]["attrib"])
chid = xml.SubElement(stopS, "ChannelID")
chid.text = channelID
try:
return readResponse(sendRequest(xml.tostring(root, encoding='utf-8')))
except Exception as e:
LOGGER.error("Non riesco a fermare la cattura -> {}".format(e))
def collectSamples(varMap, channelID='0'):
dataDict = {}
for var in varMap:
dataDict[var[templates.VAR_NAME]] = dict()
for var in varMap:
try:
root,body = buildBody()
getVariable = xml.SubElement(body, "MonGetVariableS", templates.COMMANDS["MonGetVariableS"]["attrib"])
chid = xml.SubElement(getVariable, "ChannelID")
chid.text = channelID
varid = xml.SubElement(getVariable, "VariableID")
varid.text = str(var[templates.VAR_ID])
resp = sendRequest(xml.tostring(root, encoding='utf-8'))
if resp is not False:
resp = readResponse(resp)["MonGetVariableS-R"]
if int(resp["retval"]) >=1 :
if len(resp["DataBuffer"]):
dataBuffer_raw = base64.b64decode(resp["DataBuffer"])
timeBuffer_raw = base64.b64decode(resp["TimeBuffer"])
nVals = int(len(dataBuffer_raw)/8)
dataStruct = struct.Struct("{}d".format(nVals))
timeStruct = struct.Struct("{}q".format(nVals))
dataVal = dataStruct.unpack(dataBuffer_raw)
timeVal = timeStruct.unpack(timeBuffer_raw)
dataDict[var[templates.VAR_NAME]][var[templates.VAR_REG]]=list(zip(timeVal,dataVal))
pass
else:
LOGGER.warning("Empty Buffer")
return False
pass
else:
return False
except Exception as e:
LOGGER.error("Eccezione inaspettata -> {}".format(e))
return False
return dataDict
pass
def readResponse(r):
responseData = r.find("SOAP-ENV:Body", templates.XMLNS_RESP)
if responseData is not None:
xmlstr = xml.tostring(responseData, encoding='utf-8')
xmldict = xmltodict.parse(xmlstr, encoding='utf-8', process_namespaces=True)
return remNameSpace(xmldict)["Body"]
else:
return False
pass
def remNameSpace(d, sep=':'):
rv=dict()
for k,v in d.items():
if isinstance(v, dict):
rv[k.split(sep)[len(k.split(sep))-1]]=remNameSpace(v)
else:
rv[k.split(sep)[len(k.split(sep))-1]]=v
return rv
def sendRequest(req):
req = str(req,encoding='utf-8')
pprinted = mini.parseString(req)
LOGGER.debug("XML_REQUEST\n"+pprinted.toprettyxml(indent='\t'))
try:
xmlResponse = requests.post(settings['URL'],templates.XML_HEADER+req,headers=templates.HTTP_HEADER)
pprinted = mini.parseString(xmlResponse.text)
LOGGER.debug("XML_RESPONSE\n"+pprinted.toprettyxml(indent='\t'))
return xml.fromstring(xmlResponse.text)
except requests.exceptions.RequestException as e:
LOGGER.error("Timeout Richiesta HTTP -> ".format(str(e)))
return False
def saveSamples(s, fileName = None):
if fileName is None:
fileName = settings["fileSaveDir"].format(time.strftime("%Y%m%d-%H.%M.%S"), fType=settings["fileType"])
with open(fileName, mode = 'w', newline='') as dataOut:
LOGGER.info("Salvo i campioni dei precedenti {}s -> {}".format(settings["fileSaveTime"],fileName))
if settings["fileType"] == "csv":
h = ["{0}-{1}_TS;{0}-{1}".format(dev["name"],reg) for dev in settings["variables"] for reg in dev["descr"]]
dataOut.write(";".join(h)+'\n')
dataWriter = csv.writer(dataOut, delimiter=';', quoting=csv.QUOTE_NONE)
dataWriter.writerows([[iii for ii in i for iii in ii] \
for i in zip(*[s[dev["name"]][code] \
for dev in settings["variables"] \
for code in dev["codes"]])])
elif settings["fileType"] == "json":
dataOut.newline='\r\n'
dataOut.write(json.dumps(s))
pass
def sendSamples(s):
mqtt.publish(settings['mqttSendSamples'],json.dumps(s))
def onMessage(cli, userdata, msg):
global stat
msg=str(msg.payload)
if "STOP" in msg:
stat = "STOP"
pass
elif "PAUSE" in msg:
pass
elif "START" in msg:
stat = "INI"
pass
elif "SAVE" in msg:
saveSamples(samples)
pass
elif "EXIT" in msg:
pass
else:
LOGGER.error("Messaggio MQTT sconosciuto")
pass
def onConnect(cli, userdata, flags, rc):
LOGGER.debug("Connessione a MQTT -> {}".format(connack_string(rc)))
cli.publish(settings["mqttSend"],"CLIENT_ON")
pass
def onDisconnect(cli, userdata, rc):
LOGGER.warning("Disonnesso da MQTT -> {}".format(connack_string(rc)))
cli.reconnect()
pass
###################################
############# MAIN ################
###################################
if __name__ == '__main__':
isRunning = True
firstCap = False
firstLoop = True
stat = "IDLE"
bufferFull = 0
loopcount = 0
variableMap = list()
samples = dict()
try:
fp = open("config.json")
settings = json.load(fp)
fp.close()
except:
print("Non posso aprire il file di configurazione!")
# Setup Logger
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
LOGGER.propagate = False
FORMATTER = logging.Formatter((settings["logFormat"]), (settings["logTimeFormat"]))
# File Logging
fh = logging.FileHandler((settings["logFile"]))
fh.setLevel(logging.WARNING)
fh.setFormatter(FORMATTER)
LOGGER.addHandler(fh)
# Console Logging
cl= logging.StreamHandler(sys.stdout)
cl.setLevel(logging.INFO)
cl.setFormatter(FORMATTER)
LOGGER.addHandler(cl)
LOGGER.warning("NasoScope {} Started!".format(settings["version"]))
mqtt = client.Client()
mqtt.on_message = onMessage
mqtt.on_connect = onConnect
mqtt.on_disconnect = onDisconnect
mqtt.connect(host=settings['mqttHost'], port=settings['mqttPort'])
mqtt.subscribe(settings['mqttReceive'])
mqtt.user_data_set(stat)
mqtt.loop_start()
influx = InfluxDBClient(host= settings['influxHost'], port=settings['influxPort'], database = settings['influxDB'])
xmlReqTempl = xml.fromstring(templates.REQ_TEMPLATE)
xmlRespTempl = xml.fromstring(templates.RESP_TEMPLATE)
while isRunning:
if stat == "INI":
r=getInfo()
if r:
LOGGER.info("Sto Monitorando-> HWid:{0}, rel:{1}"\
.format(r["GetHWKey-R"]["HwKey"],r["GetHWKey-R"]["Release"]))
stopSampling()
closeChannel()
r=openChannel()
channelID = r["MonOpenChannel-R"]["ChannelID"]
LOGGER.info("Apro il Canale-> {}"\
.format(channelID))
variableMap=setupOscope()
LOGGER.info("Aggiungo le Variabili-> {}"\
.format(variableMap))
stat = "STARTCAP"
else:
LOGGER.error('Nessuna risposta dal CN')
isRunning = False
pass
elif stat == "STARTCAP":
r=startSampling()
if r:
LOGGER.info("Inizio la cattura-> t:{}ms"\
.format(settings["samplePeriod"]))
startTime = time.time()
for k in settings["variables"]:
samples[k["name"]] = {}
for kk in k["codes"]:
samples[k["name"]][kk] = []
firstCap = True
stat = "CAP"
else:
LOGGER.error("Impossibile Iniziare la Cattura")
isRunning = False
pass
elif stat == "CAP":
tempSamples = collectSamples(variableMap,channelID)
if tempSamples is not False:
if firstCap == True:
# firstTS = min([z[templates.SAM_TS] for x in tempSamples \
# for y in tempSamples[x] for z in tempSamples[x][y]])
firstCap = False
firstTS = time.time()
LOGGER.info(f'Inizio cattura TS: {datetime.fromtimestamp(firstTS).strftime("%c")}')
pass
else:
for ax in tempSamples:
for reg in tempSamples[ax]:
if not settings['sendInflux']:
for idx,rec in enumerate(tempSamples[ax][reg]):
tempSamples[ax][reg][idx]=(tempSamples[ax][reg][idx][templates.SAM_TS],
tempSamples[ax][reg][idx][templates.SAM_VAL])
if not settings["sendMqtt"]:
if tempSamples[ax][reg][idx][templates.SAM_TS] > settings["recordTime"]:
samples[ax][reg] = samples[ax][reg][-(len(samples)-len(tempSamples)):]
if bufferFull < 1:
bufferFull = 1
elif bufferFull == 1:
LOGGER.warning("Buffer Campioni Pieno, inizio roll")
bufferFull = 2
else:
pass
pass
pass
samples[ax][reg]+=tempSamples[ax][reg]
else:
sendSamples(tempSamples)
else:
influxMeas = list()
for idx,record in enumerate(tempSamples[ax][reg]):
ts = tempSamples[ax][reg][idx][templates.SAM_TS]+firstTS*1000000
influxMeas.append({
"measurement" : ax,
"tags": {
"parameter": str(settings['variableNames'][str(reg)])
},
"time": int(ts),
"fields":{
"value": tempSamples[ax][reg][idx][templates.SAM_VAL]
}
})
try:
influx.write_points(points=influxMeas, time_precision='u', retention_policy=settings['influxRetention'])
except Exception as e:
LOGGER.error(f'Impossibile scrivere nel database: {e}')
LOGGER.error(f'TS da controllo: {datetime.fromtimestamp(ts).strftime("%d/%m/%y_%H:%M:%S,%f")}')
stat='STOP'
pass
pass
pass
if not settings['sendMqtt'] and not settings['sendInflux']:
if time.time() - startTime > settings["fileSaveTime"]:
saveSamples(samples)
startTime = time.time()
LOGGER.info("{}".format(tempSamples))
loopcount+=1
if loopcount % 100 == 0:
LOGGER.info(f"Loop Numero, sono vivo: {loopcount}")
else:
LOGGER.error("Disconnesso dal CN")
stat = "STOP"
pass
elif stat == "STOP":
stopSampling()
closeChannel()
if not settings['sendMqtt']:
saveSamples(samples)
LOGGER.warning("Chiudo il Canale")
stat = "IDLE"
firstLoop = True
loopcount = 0
pass
elif stat == "IDLE" and firstLoop == True:
LOGGER.info("Pronto a Iniziare Cattura")
firstLoop = False
else:
pass
time.sleep(settings["loopPeriod"])
mqtt.disconnect()
sys.exit(0)