414 lines
16 KiB
Python
414 lines
16 KiB
Python
'''
|
|
Created on 5 set 2019
|
|
|
|
@author: Emanuele Trabattoni
|
|
'''
|
|
import sys
|
|
import time
|
|
import json
|
|
import requests
|
|
import xml.etree.ElementTree as xml
|
|
import xml.dom.minidom as mini
|
|
import dicttoxml, xmltodict
|
|
import logging
|
|
import copy
|
|
import base64
|
|
import struct
|
|
import csv
|
|
|
|
from paho.mqtt import client
|
|
from datetime import datetime
|
|
import templates
|
|
from paho.mqtt.client import connack_string
|
|
from influxdb import InfluxDBClient
|
|
|
|
def buildBody():
|
|
root = xml.Element("soap:Envelope", templates.XMLNS_ENV)
|
|
body = xml.SubElement(root, "soap:Body")
|
|
return root,body
|
|
|
|
def getInfo():
|
|
root,body = buildBody()
|
|
xml.SubElement(body, "GetHWKey", templates.COMMANDS["GetHWKey"]["attrib"])
|
|
resp = sendRequest(xml.tostring(root, encoding='utf-8'))
|
|
if resp:
|
|
return readResponse(resp)
|
|
else:
|
|
return False
|
|
pass
|
|
|
|
def setupOscope():
|
|
tempVarmap = list()
|
|
for var in settings["variables"]:
|
|
tempVar = copy.copy(templates.COMMANDS["MonAddVariable"]["elem"])
|
|
tempVar["VarDescr"]["DeviceID"] = var["deviceID"]
|
|
tempVar["VarDescr"]["SamplingPeriod"] = settings["samplePeriod"]
|
|
for cod in var["codes"]:
|
|
root,body = buildBody()
|
|
addVariable = xml.SubElement(body, "MonAddVariable", templates.COMMANDS["MonAddVariable"]["attrib"])
|
|
tempVar["VarDescr"]["Code"] = cod
|
|
tempVarDescr = str(dicttoxml.dicttoxml(tempVar,attr_type=False), encoding='utf-8')
|
|
tempxml = xml.fromstring(tempVarDescr)
|
|
addVariable.append(tempxml.find("ChannelID"))
|
|
addVariable.append(tempxml.find("VarDescr"))
|
|
resp = sendRequest(xml.tostring(root, encoding='utf-8'))
|
|
if resp:
|
|
resp = readResponse(resp)["MonAddVariable-R"]
|
|
if int(resp["retval"]):
|
|
newvar=(var["name"],cod,int(resp["VariableID"]))
|
|
tempVarmap.append(newvar)
|
|
LOGGER.debug("Aggingo la variabile-> {}"\
|
|
.format(newvar))
|
|
pass
|
|
else:
|
|
LOGGER.error("Non posso aggiungere la variabile-> {}"\
|
|
.format(var))
|
|
else:
|
|
return False
|
|
return tempVarmap
|
|
|
|
def openChannel():
|
|
root,body = buildBody()
|
|
opench=xml.SubElement(body, "MonOpenChannel", templates.COMMANDS["MonOpenChannel"]["attrib"])
|
|
sync=xml.SubElement(opench, "Synchronized")
|
|
sync.text = "false"
|
|
resp = sendRequest(xml.tostring(root, encoding='utf-8'))
|
|
if resp is not False:
|
|
return readResponse(resp)
|
|
else:
|
|
return False
|
|
pass
|
|
|
|
def closeChannel():
|
|
root,body = buildBody()
|
|
opench=xml.SubElement(body, "MonCloseChannel", templates.COMMANDS["MonCloseChannel"]["attrib"])
|
|
sync=xml.SubElement(opench, "Synchronized")
|
|
sync.text = "false"
|
|
resp = sendRequest(xml.tostring(root, encoding='utf-8'))
|
|
if resp is not False:
|
|
return readResponse(resp)
|
|
else:
|
|
return False
|
|
pass
|
|
|
|
|
|
def startSampling(channelID='0'):
|
|
root,body = buildBody()
|
|
startS = xml.SubElement(body, "MonStartSampling", templates.COMMANDS["MonStartSampling"]["attrib"])
|
|
chid = xml.SubElement(startS, "ChannelID")
|
|
chid.text = channelID
|
|
try:
|
|
return readResponse(sendRequest(xml.tostring(root, encoding='utf-8')))
|
|
except Exception as e:
|
|
LOGGER.error("Non riesco a iniziare la cattura -> {}".format(e))
|
|
|
|
def stopSampling(channelID='0'):
|
|
root,body = buildBody()
|
|
stopS = xml.SubElement(body, "MonStopSampling", templates.COMMANDS["MonStopSampling"]["attrib"])
|
|
chid = xml.SubElement(stopS, "ChannelID")
|
|
chid.text = channelID
|
|
try:
|
|
return readResponse(sendRequest(xml.tostring(root, encoding='utf-8')))
|
|
except Exception as e:
|
|
LOGGER.error("Non riesco a fermare la cattura -> {}".format(e))
|
|
|
|
def collectSamples(varMap, channelID='0'):
|
|
dataDict = {}
|
|
for var in varMap:
|
|
dataDict[var[templates.VAR_NAME]] = dict()
|
|
for var in varMap:
|
|
try:
|
|
root,body = buildBody()
|
|
getVariable = xml.SubElement(body, "MonGetVariableS", templates.COMMANDS["MonGetVariableS"]["attrib"])
|
|
chid = xml.SubElement(getVariable, "ChannelID")
|
|
chid.text = channelID
|
|
varid = xml.SubElement(getVariable, "VariableID")
|
|
varid.text = str(var[templates.VAR_ID])
|
|
resp = sendRequest(xml.tostring(root, encoding='utf-8'))
|
|
if resp is not False:
|
|
resp = readResponse(resp)["MonGetVariableS-R"]
|
|
if int(resp["retval"]) >=1 :
|
|
if len(resp["DataBuffer"]):
|
|
dataBuffer_raw = base64.b64decode(resp["DataBuffer"])
|
|
timeBuffer_raw = base64.b64decode(resp["TimeBuffer"])
|
|
nVals = int(len(dataBuffer_raw)/8)
|
|
dataStruct = struct.Struct("{}d".format(nVals))
|
|
timeStruct = struct.Struct("{}q".format(nVals))
|
|
dataVal = dataStruct.unpack(dataBuffer_raw)
|
|
timeVal = timeStruct.unpack(timeBuffer_raw)
|
|
dataDict[var[templates.VAR_NAME]][var[templates.VAR_REG]]=list(zip(timeVal,dataVal))
|
|
pass
|
|
else:
|
|
LOGGER.warning("Empty Buffer")
|
|
return False
|
|
pass
|
|
else:
|
|
return False
|
|
except Exception as e:
|
|
LOGGER.error("Eccezione inaspettata -> {}".format(e))
|
|
return False
|
|
return dataDict
|
|
pass
|
|
|
|
def readResponse(r):
|
|
responseData = r.find("SOAP-ENV:Body", templates.XMLNS_RESP)
|
|
if responseData is not None:
|
|
xmlstr = xml.tostring(responseData, encoding='utf-8')
|
|
xmldict = xmltodict.parse(xmlstr, encoding='utf-8', process_namespaces=True)
|
|
return remNameSpace(xmldict)["Body"]
|
|
else:
|
|
return False
|
|
pass
|
|
|
|
def remNameSpace(d, sep=':'):
|
|
rv=dict()
|
|
for k,v in d.items():
|
|
if isinstance(v, dict):
|
|
rv[k.split(sep)[len(k.split(sep))-1]]=remNameSpace(v)
|
|
else:
|
|
rv[k.split(sep)[len(k.split(sep))-1]]=v
|
|
return rv
|
|
|
|
def sendRequest(req):
|
|
req = str(req,encoding='utf-8')
|
|
pprinted = mini.parseString(req)
|
|
LOGGER.debug("XML_REQUEST\n"+pprinted.toprettyxml(indent='\t'))
|
|
try:
|
|
xmlResponse = requests.post(settings['URL'],templates.XML_HEADER+req,headers=templates.HTTP_HEADER)
|
|
pprinted = mini.parseString(xmlResponse.text)
|
|
LOGGER.debug("XML_RESPONSE\n"+pprinted.toprettyxml(indent='\t'))
|
|
return xml.fromstring(xmlResponse.text)
|
|
except requests.exceptions.RequestException as e:
|
|
LOGGER.error("Timeout Richiesta HTTP -> ".format(str(e)))
|
|
return False
|
|
|
|
def saveSamples(s, fileName = None):
|
|
if fileName is None:
|
|
fileName = settings["fileSaveDir"].format(time.strftime("%Y%m%d-%H.%M.%S"), fType=settings["fileType"])
|
|
with open(fileName, mode = 'w', newline='') as dataOut:
|
|
LOGGER.info("Salvo i campioni dei precedenti {}s -> {}".format(settings["fileSaveTime"],fileName))
|
|
if settings["fileType"] == "csv":
|
|
h = ["{0}-{1}_TS;{0}-{1}".format(dev["name"],reg) for dev in settings["variables"] for reg in dev["descr"]]
|
|
dataOut.write(";".join(h)+'\n')
|
|
dataWriter = csv.writer(dataOut, delimiter=';', quoting=csv.QUOTE_NONE)
|
|
dataWriter.writerows([[iii for ii in i for iii in ii] \
|
|
for i in zip(*[s[dev["name"]][code] \
|
|
for dev in settings["variables"] \
|
|
for code in dev["codes"]])])
|
|
elif settings["fileType"] == "json":
|
|
dataOut.newline='\r\n'
|
|
dataOut.write(json.dumps(s))
|
|
pass
|
|
|
|
def sendSamples(s):
|
|
mqtt.publish(settings['mqttSendSamples'],json.dumps(s))
|
|
|
|
|
|
def onMessage(cli, userdata, msg):
|
|
global stat
|
|
msg=str(msg.payload)
|
|
if "STOP" in msg:
|
|
stat = "STOP"
|
|
pass
|
|
elif "PAUSE" in msg:
|
|
pass
|
|
elif "START" in msg:
|
|
stat = "INI"
|
|
pass
|
|
elif "SAVE" in msg:
|
|
saveSamples(samples)
|
|
pass
|
|
elif "EXIT" in msg:
|
|
pass
|
|
else:
|
|
LOGGER.error(f"Messaggio MQTT sconosciuto: {msg}")
|
|
pass
|
|
|
|
def onConnect(cli, userdata, flags, rc):
|
|
LOGGER.debug("Connessione a MQTT -> {}".format(connack_string(rc)))
|
|
cli.publish(settings["mqttSend"],"CLIENT_ON")
|
|
pass
|
|
|
|
def onDisconnect(cli, userdata, rc):
|
|
LOGGER.warning("Disonnesso da MQTT -> {}".format(connack_string(rc)))
|
|
cli.reconnect()
|
|
pass
|
|
|
|
###################################
|
|
############# MAIN ################
|
|
###################################
|
|
if __name__ == '__main__':
|
|
isRunning = True
|
|
firstCap = False
|
|
firstLoop = True
|
|
stat = "IDLE"
|
|
bufferFull = 0
|
|
loopcount = 0
|
|
variableMap = list()
|
|
samples = dict()
|
|
|
|
try:
|
|
print(f'Config: {sys.argv[1]}')
|
|
fp = open(str(sys.argv[1]))
|
|
settings = json.load(fp)
|
|
fp.close()
|
|
except:
|
|
print("Non posso aprire il file di configurazione!")
|
|
|
|
# Setup Logger
|
|
LOGGER = logging.getLogger(__name__)
|
|
LOGGER.setLevel(logging.DEBUG)
|
|
LOGGER.propagate = False
|
|
FORMATTER = logging.Formatter((settings["logFormat"]), (settings["logTimeFormat"]))
|
|
# File Logging
|
|
fh = logging.FileHandler((settings["logFile"]))
|
|
fh.setLevel(logging.WARNING)
|
|
fh.setFormatter(FORMATTER)
|
|
LOGGER.addHandler(fh)
|
|
# Console Logging
|
|
cl= logging.StreamHandler(sys.stdout)
|
|
cl.setLevel(logging.INFO)
|
|
cl.setFormatter(FORMATTER)
|
|
LOGGER.addHandler(cl)
|
|
|
|
LOGGER.warning("NasoScope {} Started!".format(settings["version"]))
|
|
|
|
|
|
mqtt = client.Client()
|
|
mqtt.on_message = onMessage
|
|
mqtt.on_connect = onConnect
|
|
mqtt.on_disconnect = onDisconnect
|
|
mqtt.connect(host=settings['mqttHost'], port=settings['mqttPort'])
|
|
mqtt.subscribe(settings['mqttReceive'])
|
|
mqtt.user_data_set(stat)
|
|
mqtt.loop_start()
|
|
|
|
influx = InfluxDBClient(host= settings['influxHost'], port=settings['influxPort'], database = settings['influxDB'])
|
|
|
|
xmlReqTempl = xml.fromstring(templates.REQ_TEMPLATE)
|
|
xmlRespTempl = xml.fromstring(templates.RESP_TEMPLATE)
|
|
|
|
while isRunning:
|
|
if stat == "INI":
|
|
r=getInfo()
|
|
if r:
|
|
LOGGER.info("Sto Monitorando-> HWid:{0}, rel:{1}"\
|
|
.format(r["GetHWKey-R"]["HwKey"],r["GetHWKey-R"]["Release"]))
|
|
stopSampling()
|
|
closeChannel()
|
|
r=openChannel()
|
|
channelID = r["MonOpenChannel-R"]["ChannelID"]
|
|
LOGGER.info("Apro il Canale-> {}"\
|
|
.format(channelID))
|
|
variableMap=setupOscope()
|
|
LOGGER.info("Aggiungo le Variabili-> {}"\
|
|
.format(variableMap))
|
|
stat = "STARTCAP"
|
|
else:
|
|
LOGGER.error('Nessuna risposta dal CN')
|
|
isRunning = False
|
|
pass
|
|
elif stat == "STARTCAP":
|
|
r=startSampling()
|
|
if r:
|
|
LOGGER.info("Inizio la cattura-> t:{}ms"\
|
|
.format(settings["samplePeriod"]))
|
|
startTime = time.time()
|
|
for k in settings["variables"]:
|
|
samples[k["name"]] = {}
|
|
for kk in k["codes"]:
|
|
samples[k["name"]][kk] = []
|
|
firstCap = True
|
|
stat = "CAP"
|
|
else:
|
|
LOGGER.error("Impossibile Iniziare la Cattura")
|
|
isRunning = False
|
|
pass
|
|
elif stat == "CAP":
|
|
tempSamples = collectSamples(variableMap,channelID)
|
|
if tempSamples is not False:
|
|
if firstCap == True:
|
|
# firstTS = min([z[templates.SAM_TS] for x in tempSamples \
|
|
# for y in tempSamples[x] for z in tempSamples[x][y]])
|
|
firstCap = False
|
|
firstTS = time.time()
|
|
LOGGER.warning(f'Inizio cattura: {datetime.fromtimestamp(firstTS).strftime("%c")}')
|
|
pass
|
|
else:
|
|
for ax in tempSamples:
|
|
for reg in tempSamples[ax]:
|
|
if not settings['sendInflux']:
|
|
for idx,rec in enumerate(tempSamples[ax][reg]):
|
|
tempSamples[ax][reg][idx]=(tempSamples[ax][reg][idx][templates.SAM_TS],
|
|
tempSamples[ax][reg][idx][templates.SAM_VAL])
|
|
if not settings["sendMqtt"]:
|
|
if tempSamples[ax][reg][idx][templates.SAM_TS] > settings["recordTime"]:
|
|
samples[ax][reg] = samples[ax][reg][-(len(samples)-len(tempSamples)):]
|
|
if bufferFull < 1:
|
|
bufferFull = 1
|
|
elif bufferFull == 1:
|
|
LOGGER.warning("Buffer Campioni Pieno, inizio roll")
|
|
bufferFull = 2
|
|
else:
|
|
pass
|
|
pass
|
|
pass
|
|
samples[ax][reg]+=tempSamples[ax][reg]
|
|
else:
|
|
sendSamples(tempSamples)
|
|
else:
|
|
influxMeas = list()
|
|
LOGGER.debug("Invio Influx")
|
|
for idx,record in enumerate(tempSamples[ax][reg]):
|
|
ts = tempSamples[ax][reg][idx][templates.SAM_TS]+firstTS*1000000
|
|
influxMeas.append({
|
|
"measurement" : ax,
|
|
"tags": {
|
|
"parameter": str(settings['variableNames'][str(reg)])
|
|
},
|
|
"time": int(ts),
|
|
"fields":{
|
|
"value": tempSamples[ax][reg][idx][templates.SAM_VAL]
|
|
}
|
|
})
|
|
try:
|
|
influx.write_points(points=influxMeas, time_precision='u', retention_policy=settings['influxRetention'])
|
|
except Exception as e:
|
|
LOGGER.error(f'Impossibile scrivere nel database: {e}')
|
|
LOGGER.error(f'TS da controllo: {datetime.fromtimestamp(ts).strftime("%d/%m/%y_%H:%M:%S,%f")}')
|
|
stat='STOP'
|
|
pass
|
|
pass
|
|
pass
|
|
if not settings['sendMqtt'] and not settings['sendInflux']:
|
|
if time.time() - startTime > settings["fileSaveTime"]:
|
|
saveSamples(samples)
|
|
startTime = time.time()
|
|
LOGGER.info("{}".format(tempSamples))
|
|
loopcount+=1
|
|
if loopcount % 100 == 0:
|
|
LOGGER.info(f"Loop Numero, sono vivo: {loopcount}")
|
|
else:
|
|
LOGGER.error("Disconnesso dal CN")
|
|
stat = "STOP"
|
|
pass
|
|
elif stat == "STOP":
|
|
stopSampling()
|
|
closeChannel()
|
|
if not settings['sendMqtt'] and not settings['sendInflux']:
|
|
saveSamples(samples)
|
|
LOGGER.warning("Chiudo il Canale")
|
|
firstLoop = True
|
|
stat = "IDLE"
|
|
pass
|
|
elif stat == "IDLE":
|
|
if firstLoop:
|
|
LOGGER.info("Pronto a Iniziare Cattura")
|
|
firstLoop = False
|
|
else:
|
|
pass
|
|
time.sleep(settings["loopPeriod"])
|
|
mqtt.disconnect()
|
|
sys.exit(0)
|
|
|