Files
bananaSPLIT/SPEx/spex/libbananasplit/libsplit.py
Emanuele 16ab928383 corretti problemi di pylint
c'e' da renderlo meno rompicoglioni
2020-10-12 16:00:20 +02:00

404 lines
20 KiB
Python

'''
Created on 2 nov 2019
@author: Emanuele Trabattoni
'''
from PyQt5.QtCore import QObject, pyqtSignal
from slugify import slugify
import time, parse, re, copy, os, json
import traceback
class bananaSPLITTER(QObject):
sendStatus = pyqtSignal(str)
def __init__(self, fileParams=None, logger=None):
try:
QObject.__init__(self)
self.fileParams = fileParams
self.log = logger
self.rawFile = None
self.status = "first"
self.contentList = list()
self.bodyCounter=0
self.duplicateNumber=0
#self.log.debug(f"Configurazione: \n {json.dumps(fileParams, indent=2)}")
if fileParams is not None:
self.log.debug(self.tr("Sto operando sul file: {}..").format(self.fileParams['name']))
self.paths = self.fileParams['paths']
self.docStruct = self.fileParams['docStruct']
self.settings = self.fileParams['settings']
self.fileName = self.fileParams['name']
self.outPath = self.paths['OUTworkPath']+slugify(self.fileName)+'/'
self.beginTime = time.time()
pass
else:
self.log.critical(self.tr("Non e' stato fornito il nome di alcun file da splittare!"))
except Exception as e:
print(f"libSplit error: {e}")
pass
def run(self):
self.log.info(self.tr("Nuovo SPLITTER su file: {}").format(self.fileName))
self.sendStatus.emit(self.tr(f"Inizio {self.fileName}"))
try:
self.openFile()
self.remEmptyLines()
if self.docStruct['fileVersNew']:
self.newSplitFile()
else:
self.splitFile()
if self.settings['removeDuplicates']:
self.log.info(self.tr("Controllo se ci sono dei duplicati.."))
self.removeDuplicates()
else:
for idx, ff in enumerate(self.contentList):
ff['duplicate']=False
self.contentList[idx]=ff
self.log.warn(self.tr("Salto il controllo dei duplicati.."))
# se il parse e la rimozione dei duplicati e' andata bene
# preparo e inizio il salvataggio
if os.path.exists(self.outPath):
if self.settings['removeOldFiles']:
#os.chdir(self.outPath)
for f in os.listdir(self.outPath):
os.remove(self.outPath+f)
else:
raise FileExistsError(self.tr("Non posso sovrascrivere i vecchi file, eliminali manualmente!"))
else:
os.mkdir(self.outPath)
#os.chdir(self.outPath)
if self.settings['saveSeparateFiles']:
self.saveSeparate()
if self.settings['saveBodyFile']:
self.saveBody()
self.log.info(self.tr("L'elaborazione del file ha richiesto {:4.2f} sec").format(time.time()-self.beginTime))
except UnicodeDecodeError as ee:
self.log.critical(self.tr("Il file [{}] contiene caratteri non compatibili con la codifica scelta! [{}]")
.format(self.fileParams['name'],ee))
except FileExistsError as fe:
self.log.critical(fe)
except Exception as ee:
traceback.print_exc()
self.log.warn(ee)
pass
def openFile(self):
try:
#os.chdir(self.paths["INworkPath"])
self.log.info(self.tr("Carico il contenuto.."))
fp = open(self.paths["INworkPath"]+'/'+self.fileParams['name'], mode='r', encoding=self.settings['encoding'])
self.rawFile = fp.readlines()
fp.close()
except IOError as e:
self.log.critical(self.tr("Impossibile aprire il file: {}! [{}]").format(self.fileName,e))
raise BaseException("OpenFile")
os.rmdir(self.outPath)
except Exception as ee:
print(ee)
pass
def remEmptyLines(self):
self.log.info(self.tr("Elimino righe vuote e caratteri inutili.."))
tempContent = []
try:
for ll in self.rawFile:
for c in self.settings['delChars']:
ll = ll.replace(c,'')
if ll not in ['\n', '\r', '\r\n']:
tempContent.append(ll)
self.rawFile = copy.deepcopy(tempContent)
self.rawFile.append('\n') #linea vuota finale per essere sicuri di parsare bene
return True
except:
self.log.error(self.tr("Errore inaspettato durante l'eliminazione delle righe vuote!"))
raise BaseException("DelLines")
del tempContent
def newSplitFile(self): #porting del codice dal programma originale
self.log.info(self.tr("Individuo il contenuto.."))
self.bodyCounter=0
self.duplicateNumber=0
docNumber = 0
docSkipped = 0
docDate = {}
title = ''
prevLine = ''
newsPaperName = ''
titleBegin = True
newsName = False
tempBody = list()
for l in self.rawFile: #per ogni linea del file
l=l.replace('\xa0', ' ')
lineWords = l.lstrip().split(' ') #dividi la riga in parole
if self.status == 'first':
try:
# ricerco la data
if (lineWords[self.settings['monthPosition']]).capitalize() in self.docStruct['language']['dateWords']:
try:
docDate=parse.parse(self.docStruct['dateFormat'],l).named
docDate['month']=docDate['month'].lstrip().rstrip().capitalize()
docDate['month']=self.docStruct['language']['dateWords'].index(docDate['month'])+1
# dopo la data inizia il titolo, ma prima si cerca il nome del giornale
except:
self.log.warn(self.tr("Ho trovato una riga ambigua.. potrebbe essere una data ma non so: [{}]"). format(l.strip('\r\n')))
pass
elif lineWords[0].upper() in self.docStruct['language']['headWords']:
#cambio stato e inizializzo un nuovo documento da riempire
self.status = 'head'
newDoc=dict()
newDoc['title']=title
newDoc['date']=docDate
newDoc['newsPaperName'] = newsPaperName
titleBegin = False
newsName = False
else:
if titleBegin:
title = l.strip().capitalize()
titleBegin = False
newsName = True
elif newsName:
if self.settings['getNewsPaperName']:
try:
if l.strip():
newsPaperName = l.strip()
else:
newsPaperName = self.settings['nameNotFoundStr']
except:
self.log.warn(self.tr("E' successo qualcosa mentre stavo cercando il nome della pubblicazione,\
controlla i file di uscita! [{}]").format(l.strip()))
else:
newsPaperName = self.settings['nameNotFoundStr']
newsName = False
except IndexError:
self.log.error(self.tr("Errore inaspettato, contatta il tuo sviluppatore di fiducia!"))
pass
elif self.status == 'head':
tempContent = list()
#doppio check per trovare l'inizio del corpo documento
if re.match(self.docStruct['beginOfDocument'],l):
self.status='body'
#if lineWords[0].upper() not in self.docStruct['language']['headWords']: #se la prima parola non e' tra quelle di inizio
#tempBody.append(l) # vuol dire che ho trovato l'articolo e aggiungo la prima riga al contenuto del documento
#self.status = 'body'
elif self.status == 'body':
if re.match(self.docStruct['endOfDocument'],l) is not None: #controlla se ci sono articoli che non hanno le parole chiave finali
self.log.warn(self.tr("Ho individuato una separatore valido prima che si chiusesse l'articolo precedente, controlla i tuoi file in uscita!\n\
L'errore dovrebbe essere intorno all'articolo {} ma non sono sicuro! \n\t\t\t[{}]").format(docNumber, l.strip()))
self.status = 'tail'
anomaly = True
elif not lineWords[0].upper() in self.docStruct['language']['tailWords']: #se la prima parola non e' tra quelle di fine
if self.settings['delLF']:
tempBody.append(l.strip('\n')) #allora sto leggendo l'articolo
else:
tempBody.append(l)
else:
self.status = 'tail'
anomaly = False
pass
elif self.status == 'tail':
if anomaly or (re.match(self.docStruct['endOfDocument'],l) is not None):
self.status = 'first'
anomaly = False
titleBegin = True
if self.settings['delWordBreak']:
tempContent=[ll.replace('-\n', '') for ll in tempContent]
newDoc['content']=copy.deepcopy(''.join(tempBody))
self.contentList.append(copy.deepcopy(newDoc))
tempBody=list()
self.bodyCounter +=1
pass
else:
self.log.critical(self.tr("Stato Interno Sconosciuto"))
prevLine=l #salva sempre e comunque il contenuto della linea precedente
pass
#ricerca terminata, espongo i risultati
self.log.info(self.tr("Nel file ho trovato {0} articoli..").format(self.bodyCounter))
if docSkipped > 0:
self.log.warn(self.tr("Attentione, LexisNexis ne ha saltati {} !!!").format(docSkipped))
pass
pass
def splitFile(self): #porting del codice dal programma originale
self.log.info(self.tr("Individuo il contenuto.."))
self.bodyCounter=0
self.duplicateNumber=0
docNumber = 0
docSkipped = 0
docDate = {}
title = ''
prevLine = ''
newsPaperName = ''
titleBegin = False
tempBody = list()
docSep=re.compile(self.docStruct['docSep'])
for l in self.rawFile: #per ogni linea del file
lineWords = l.lstrip().split(' ') #dividi la riga in parole
if self.status == 'first':
#prendo il numero di documento per vedere se ci sono buchi
try:
try:
nn = parse.parse("{current:d} Of {total} Documents",l.strip().capitalize()).named
if nn["current"]-docNumber==1:
pass
else:
if self.settings["showSkipped"]:
self.log.warn(self.tr("Il conto dei documenti non torna! LexisNexis ne ha saltato qualcuno!\n Precedente:{0}-Attuale:{1}").format(docNumber,nn["current"]))
docSkipped+=1
docNumber = nn["current"]
except:
pass #non segnalare eccezione se il parse fallisce
# ricerco la data
if (lineWords[self.settings['monthPosition']]).capitalize() in self.docStruct['language']['dateWords']:
try:
docDate=parse.parse(self.docStruct['dateFormat'],l).named
docDate['month']=docDate['month'].lstrip().rstrip().capitalize()
docDate['month']=self.docStruct['language']['dateWords'].index(docDate['month'])+1
title = ''
titleBegin=True
# dopo la data inizia il titolo, ma prima si cerca il nome del giornale
if self.settings['getNewsPaperName']:
try:
if prevLine.split(' ')[0].strip().isalpha():
newsPaperName = prevLine.strip()
else:
newsPaperName = self.settings['nameNotFoundStr']
except:
self.log.warn(self.tr("E' successo qualcosa mentre stavo cercando il nome della pubblicazione,\
controlla i file di uscita! [{}]").format(prevLine.strip()))
else:
newsPaperName = self.settings['nameNotFoundStr']
except:
self.log.warn(self.tr("Ho trovato una riga ambigua.. potrebbe essere una data ma non so: [{}]").format(l.strip('\r\n')))
pass
elif lineWords[0] in self.docStruct['language']['headWords']:
#cambio stato e inizializzo un nuovo documento da riempire
self.status = 'head'
newDoc=dict()
newDoc['title']=title
newDoc['date']=docDate
newDoc['newsPaperName'] = newsPaperName
titleBegin=False
else:
if titleBegin:
title += l.strip().capitalize()
except IndexError:
self.log.error(self.tr("Errore inaspettato, contatta il tuo sviluppatore di fiducia!"))
pass
elif self.status == 'head':
tempContent = list()
if lineWords[0] not in self.docStruct['language']['headWords']: #se la prima parola non e' tra quelle di inizio
tempBody.append(l) # vuol dire che ho trovato l'articolo e aggiungo la prima riga al contenuto del documento
self.status = 'body'
pass
elif self.status == 'body':
if not lineWords[0] in self.docStruct['language']['tailWords']: #se la prima parola non e' tra quelle di fine
if self.settings['delLF']:
tempBody.append(l.strip('\n')) #allora sto leggendo l'articolo
else:
tempBody.append(l)
else:
self.status = 'tail'
anomaly = False
if docSep.match(l) is not None: #controlla se ci sono articoli che non hanno le parole chiave finali
self.log.warn(self.tr("Ho individuato una separatore valido prima che si chiusesse l'articolo precedente, controlla i tuoi file in uscita!\n\
L'errore dovrebbe essere intorno all'articolo {} ma non sono sicuro! \n\t\t[{}]").format(docNumber, l.strip()))
self.status = 'tail'
anomaly = True
pass
elif self.status == 'tail':
if docSep.match(l) is not None or anomaly:
self.status = 'first'
anomaly = False
if self.settings['delWordBreak']:
tempContent=[ll.replace('-\n', '') for ll in tempContent]
newDoc['content']=copy.deepcopy(''.join(tempBody))
self.contentList.append(copy.deepcopy(newDoc))
tempBody=list()
self.bodyCounter +=1
pass
else:
self.log.critical(self.tr("Stato Interno Sconosciuto"))
prevLine=l #salva sempre e comunque il contenuto della linea precedente
pass
#ricerca terminata, espongo i risultati
self.log.info(self.tr("Nel file ho trovato {0} articoli..").format(self.bodyCounter))
if docSkipped > 0:
self.log.warn(self.tr("Attentione, LexisNexis ne ha saltati {} !!!").format(docSkipped))
pass
pass
def removeDuplicates(self):
titleList=[]
duplicateList=[]
for idx, ff in enumerate(self.contentList):
if ff['title'] not in titleList:
titleList.append(ff['title'])
ff['duplicate']=False
self.contentList[idx]=ff
pass
else:
if ff['title'] not in duplicateList:
duplicateList.append(ff['title'])
if self.settings['showRemovedDuplicates']:
self.log.info(self.tr("Duplicato: {}").format(ff['title'].strip()))
ff['duplicate'] = True
self.contentList[idx]=ff
self.duplicateNumber+=1
self.log.info(self.tr("Ho rimosso {} duplicati di {} articoli..").format(self.duplicateNumber, len(duplicateList)))
pass
def saveSeparate(self):
outFileCounter = 0
self.paths['OUTworkPath']=self.paths['OUTworkPath']+slugify(self.fileName)
self.log.info(self.tr("Salvo gli articoli in file separati..."))
self.log.debug(self.tr("Persorso: {0}").format(self.outPath))
for ff in self.contentList:
try:
if ff['duplicate'] == False:
fName=self.docStruct['outNameFormat'].format(title=slugify(ff['title'][:self.docStruct['maxTitleLen']]),\
filename=slugify(self.fileName),\
docnum=outFileCounter,\
papername=ff['newsPaperName'].strip(),\
**ff['date'])
out=open(self.outPath+'{0}'.format(fName),'wb')
if self.settings['includeTitle']:
ff['content'] = ff['title']+os.linesep+ff['content']
out.write(ff['content'].encode(self.settings['encoding']))
out.close()
outFileCounter+=1
except IOError as e:
self.log.error(self.tr("Qualcosa e\' andato storto, non riesco a scrivere il file: {}").format(e))
continue
except KeyError as ke:
self.log.error(self.tr(f"Chiave {ke} non trovata per:{outFileCounter} {ff['title']} "))
except Exception as ee:
traceback.print_exc()
self.log.error(self.tr(f"Errore generale nel salvataggio: {ee}"))
if outFileCounter < self.bodyCounter:
self.log.error(self.tr("Ho salvato meno file rispetto a quelli trovati!"))
pass
def saveBody(self):
self.log.info(self.tr('Salvo gli articoli in un singolo file vicino agli originali...'))
self.log.debug(self.tr('Persorso: {0}').format(self.outPath))
#os.chdir(self.outPath)
try:
fName=slugify(self.fileName)
fName='BODYFILE_{0}.txt'.format(fName[:self.docStruct['maxTitleLen']])
fileContent = os.linesep.join([cc['content'] for cc in self.contentList])
out=open(self.outPath+'{0}'.format(fName),'wb')
out.write(fileContent.encode(self.settings['encoding']))
out.close()
except IOError as e:
self.log.error(self.tr("Qualcosa e\' andato storto, non riesco a scrivere il file: {}").format(e))
except Exception as ee:
self.log.error(self.tr(f"Errore generale nel salvataggio: {ee}"))
pass