Files
StandOut/lambda_config/lambda_config.py

213 lines
7.2 KiB
Python

import os
import json
import boto3
from uuid import uuid4
from pydantic import BaseModel
from typing import Dict, Optional
from redirects_base import Content, Customer, Redirects, Tag
s3_client = None
bucket_config = ''
bucket_data = ''
function_url = ''
class S3Bucket(BaseModel):
name: str
ownerIdentity: dict
arn: str
class S3Object(BaseModel):
key: str
eTag: Optional[str | None] = None
size: Optional[int | None] = None
sequencer: Optional[str | None] = None
class S3Event(BaseModel):
s3SchemaVersion: str
bucket: S3Bucket
object: S3Object
class Record(BaseModel):
eventName: str
eventSource: str
eventTime: str
s3: S3Event
def lambda_handler(event: dict, context):
global s3_client, bucket_config, bucket_data, function_url
if s3_client is None:
print("Init Function")
bucket_config = os.environ.get('BUCKET_CONFIG', 'standout-config')
bucket_data = os.environ.get('BUCKET_DATA', 'standout-data')
function_url = os.environ.get('FUNCTION_URL', 'https://api.standout.it/dev')
print(f'Bucket Config: {bucket_config}')
print(f' Bucket Data: {bucket_data}')
s3_client = boto3.client('s3')
## Download redirects file
redirects: Redirects
try:
if context is not None:
resp = s3_client.get_object(
Bucket=bucket_config,
Key='redirects.json'
)
redirects = Redirects.model_validate_json(resp['Body'].read())
else:
with open('/home/emanuele/dev/StandOut/lambda_config/redirects.json', 'r') as f:
redirects = Redirects.model_validate_json(f.read(), strict=False)
except s3_client.exceptions.NoSuchKey as e:
print(e)
# Oppure pagina "siamo spiacenti ma il contenuto non e' disponibile"
return {
"statusCode": 404
}
# Proces records
for r in event["Records"]:
record = Record(**r)
if record.eventSource != "aws:s3":
return False
print(f"Action: {record.eventName}")
print(f"Object: {record.s3}")
match record.eventName:
case "ObjectCreated:Put" | "ObjectCreated:CompleteMultipartUpload":
processAdd(record=record, redirects=redirects, client=s3_client)
case "ObjectRemoved:Delete":
processDelete(record=record, redirects=redirects)
case "ObjectCreated:Copy":
print(f"Object copy: {record.s3.object.key}")
case _:
print("Unknown action")
if context is not None:
resp = s3_client.put_object(Bucket=bucket_config,
Key='redirects.json',
Body=redirects.model_dump_json(indent=2))
print(f"New redirects version: {resp['ETag']}")
else:
with open('/home/emanuele/dev/StandOut/lambda_config/redirects.json', 'w') as f:
f.write(redirects.model_dump_json(indent=2))
return True
def getObjectKeys(record: Record) -> list[str]:
keys = [v for v in record.s3.object.key.split('/') if v != '']
keys.reverse()
return keys
def processAdd(record: Record, redirects: Redirects, client) -> None:
# splitta la chiave per capire la directory
keys = getObjectKeys(record=record)
# crea il primo utente se necessario o selezionalo
cust_name = keys.pop()
# cerca la chiave utente dal nome e nel caso non sia presente creane uno nuovo
cust_id = searchCustomerKey(redirects=redirects, cust_name=cust_name, default=uuid4().hex)
assert(cust_id is not None) # uuid cannot return none
if redirects.customers is None:
redirects.customers = {cust_id: Customer(name=cust_name, status="active", tags=None)}
if cust_id not in redirects.customers.keys():
redirects.customers[cust_id] = Customer(name=cust_name, status="active", tags=None)
# Aggiunto solo un cliente
c = redirects.customers[cust_id]
if len(keys) == 0 or not c:
return
# crea un tag per l'utente, con contenuto nullo o selezionalo
tag_key = keys.pop()
if c.tags is None:
c.tags = {tag_key: Tag(status="active", content=None)}
if tag_key not in c.tags.keys():
c.tags[tag_key] = Tag(status="active", content=None)
# Aggiunta anche una chiave
t = c.tags[tag_key]
if len(keys) == 0 or not t:
return
# Crea un contenuto per il tag a seconda della lunghezza della chiave
file_name = keys[0]
if file_name == "url.txt":
with client.get_object(Bucket=bucket_data, Key=record.s3.object.key)['Body'] as url_file:
content = Content(type='url', key=file_name, url=url_file.readline().decode().strip(), tag_url=None)
else:
content = Content(type='s3', key=file_name, url=None, tag_url=None)
# Aggiungi il contenuto a una faccia o al tag a seconda della lunghezza del path
match len(keys):
case 2:
content.tag_url = generateTagUrl(cust_id, tag_key, face_id=keys[1])
if t.content is None or isinstance(t.content, Content):
t.content = {keys[1]: content}
elif isinstance(t.content, dict):
t.content[keys[1]] = content
case 1:
content.tag_url = generateTagUrl(cust_id, tag_key, None)
t.content = content
case _:
print("Too long keys")
print(f"ObjectCreated: {record.s3.object.key}")
def generateTagUrl(cust_id: str, tag_id: str, face_id: str | None) -> str:
return f"{function_url}/api?id={cust_id}&tag_id={tag_id}{f"&face_id={face_id}" if face_id else ""}"
def searchCustomerKey(redirects: Redirects, cust_name: str, default: str | None) -> str | None:
if redirects.customers is None:
return default
for k,v in redirects.customers.items():
if v is not None and v.name == cust_name:
return k
return default
def processDelete(record: Record, redirects: Redirects) -> None:
keys = getObjectKeys(record=record)
if not redirects.customers:
return
cust_name = keys.pop()
cust_key = searchCustomerKey(redirects=redirects, cust_name=cust_name, default=None)
if cust_key is None:
return
customer = redirects.customers.get(cust_key, None)
if len(keys) == 0 or not customer:
redirects.customers.pop(cust_key, None)
return
if not customer.tags:
return
tag_key = keys.pop()
tag = customer.tags.get(tag_key, None)
if len(keys) == 0 or not tag:
customer.tags.pop(tag_key, None)
return
match len(keys):
case 2:
if not tag.content or not isinstance(tag.content, Dict):
return
tag.content.pop(keys[1], None)
case 1:
if isinstance(tag.content, Dict) and keys[0] in tag.content.keys():
tag.content[keys[0]] = None
else:
tag.content = None
case _:
print("Unexpected")
return
print(f"Object remove: {record.s3.object.key}")
if __name__ == "__main__":
with open('/home/emanuele/dev/StandOut/lambda_config/test.json', 'r') as f:
lambda_handler(json.load(f), None)