Files
edelweiss/projmon/projrequest.py

63 lines
2.0 KiB
Python

import requests
import urllib3
# Suppress only the single warning from urllib3.
urllib3.disable_warnings(category=urllib3.exceptions.InsecureRequestWarning)
import xmltodict
from requests.auth import HTTPBasicAuth
from os import getcwd
import json
from commands import SMSMessage
from typing import Dict, List, Any, Type, TypeVar
T = TypeVar("T", bound=SMSMessage)
TEST = True
class ProjectorConnection():
ip: str
username: str
password: str
auth: HTTPBasicAuth
def __init__(self, ip: str, username: str, password: str) -> None:
self.ip = ip
self.username = username
self.password = password
self.auth = HTTPBasicAuth(username=self.username, password=self.password)
pass
def get(self, path: List[str], params: Dict[str, str] | None = None) -> Dict[str, Any] | None:
response: requests.Response = requests.get(
url=f"https://{self.ip}/{"/".join(path)}",
params=params,
headers={
"Accept": "application/xml"
},
auth=self.auth,
allow_redirects=False,
timeout=4,
verify=False
)
response.raise_for_status()
if response.status_code == 200: # HTTP ok response
return xmltodict.parse(response.text.replace("%2F", "/").replace("%3A", ":")) # Common containert for all messages
return None
def request(self, cls: Type[T]) -> T | None:
rv: dict | None
if TEST:
with open(f'{getcwd()}/projmon/xmlresponses/showstatus.xml', 'r') as fd:
rv = xmltodict.parse(fd.read().replace("%2F", "/").replace("%3A", ":"))
else:
rv = self.get(cls.path, cls.params)
if isinstance(rv, dict):
rv = rv['SMSMessage']
type: str = rv['MessageHeader']['Type']
rv.update(**rv['MessageBody'][type])
del rv['MessageBody']
del rv['@xmlns']
print(json.dumps(rv, indent = 2))
return cls.model_validate(rv)
return None