from projrequest import ProjectorConnection from dataclasses import dataclass, field from datetime import datetime from typing import Dict, List, Any from uuid import UUID @dataclass class BaseCommand(): projector: ProjectorConnection timestamp: datetime type: str content: Any def __init__(self, proj: ProjectorConnection) -> None: self.projector = proj pass def update(self, path: List[str], params: Dict[str, Any] | None = None): resp = self.projector.get(path=path, params=params) if resp is not None: self.timestamp = datetime.fromtimestamp(float(resp['timestamp'])) self.type = resp['type'] self.content = resp['body'][resp['type']] @dataclass class DCPInfo(): ID: UUID Title: str Path: str Size: int ImportTime: datetime IsImported: bool VerifyStatus: bool ValidateStatus: bool IsPlayable: bool IsTransferred: bool @dataclass class DCPInfoList(BaseCommand): dcpInfoList: List[DCPInfo] path: List[str] = ['content', 'dcp', 'info', 'list'] params: Dict[str, str] = { 'formatDate': 'false' } def get(self): self.update(path=self.path, params=self.params) self.dcpInfoList = [DCPInfo(**e) for e in self.content] @dataclass class PowerStatus(): Device: str State: str @dataclass class PowerStatusList(BaseCommand): powerStatusList: List[PowerStatus] path: List[str] = ['status', 'sms', 'powerstatus'] def get(self): self.update(path=self.path) self.powerStatusList = [PowerStatus(**e) for e in self.content] @dataclass class ShowStatusDetailClass(): Type: str Id: UUID RemainingTime: int ElapsedTime: int TotalDuration: int CurrentEventId: UUID CurrentEventType: str IsStoppedByMalfunction: bool RewindTimeList: str MalfunctionTime: int @dataclass class ShowStatus(BaseCommand): PlayState: str ShowStatusDetail: ShowStatusDetailClass PlayBackMode: str AtmosPlayingStatus: str path: List[str] = ['playback', 'showstatus'] def get(self): self.update(self.path) self.PlayState = self.content['PlayState'] self.ShowStatusDetail = ShowStatusDetailClass(**self.content['StatusDetail']) self.PlayBackMode = self.content['PlayBackMode'] self.AtmosPlayingStatus = self.content['AtmosPlayingStatus'] @dataclass class ImportProgressClass(): TotalBytesToTransfer: int BytesTransferred: int PercentCompleted: int InProgress: int ImportPath: str CompletionStatus: str CompletionTime: str DCPTitle: str @dataclass class ValidationProgressClass(): TotalBytesToValidate: int BytesValidated: int PercentCompleted: int InProgress: bool Id: UUID CompletionStatus: str CompletionTime: datetime @dataclass class JobProgress(): Id: int ValidateAfterImport: bool AggregatePercentValidated: int State: str ImportProgress: ImportProgressClass ValidationProgressList: List[ValidationProgressClass] IngestedByFolder: bool ContentsTransferType: str @dataclass class DCPImportJobList(BaseCommand): IsPaused: bool JobProgressList: List[JobProgress]