import os import json import boto3 from uuid import uuid4 from pydantic import BaseModel from typing import Dict, Optional from redirects_base import Content, Customer, Redirects, Tag bucket_config = '' bucket_data = '' function_url = '' git_version = '' api_verson = '' s3_client = None redirects: Redirects | None = None class S3Bucket(BaseModel): name: str ownerIdentity: dict arn: str class S3Object(BaseModel): key: str eTag: Optional[str | None] = None size: Optional[int | None] = None sequencer: Optional[str | None] = None class S3Event(BaseModel): s3SchemaVersion: str bucket: S3Bucket object: S3Object class Record(BaseModel): eventName: str eventSource: str eventTime: str s3: S3Event def lambda_handler(event: dict, context): global s3_client, bucket_config, bucket_data, function_url, redirects, git_version, api_version if s3_client is None: print("Init Function") bucket_config = os.environ.get('BUCKET_CONFIG', 'standout-config') bucket_data = os.environ.get('BUCKET_DATA', 'standout-data') function_url = os.environ.get('FUNCTION_URL', 'https://api.standout.it/dev') git_version = os.environ.get('GIT_VERSION', '00000000') api_version = os.environ.get('API_VERSION', 'v1') print(f' Version: {git_version}') print(f' Bucket Data: {bucket_data}') print(f'Bucket Config: {bucket_config}') s3_client = boto3.client('s3') ### Process records for r in event["Records"]: record = Record(**r) if record.eventSource != "aws:s3": return False ## Stampa info di debug print(f"Action: {record.eventName}") print(f"Object: {record.s3.object}") ## Ritorna se la chiave non e' un file if record.s3.object.key.endswith('/') and record.s3.object.size == 0: print(f"Skip, folder only: {record.s3.object.key}") continue ## Scarica il redirects solo il primo giro if redirects is None: try: redirects = downloadRedirects(client = s3_client, context = context) redirects.version = git_version except s3_client.exceptions.NoSuchKey as e: print(e) return False match record.eventName: case "ObjectCreated:Put" | "ObjectCreated:CompleteMultipartUpload": processAdd(record=record, redirects=redirects, client=s3_client) case "ObjectRemoved:Delete": processDelete(record=record, redirects=redirects) case "ObjectCreated:Copy": print(f"Object copy: {record.s3.object.key}") case _: print("Unknown action") if redirects is not None: if context is not None: resp = s3_client.put_object(Bucket=bucket_config, Key='redirects.json', Body=redirects.model_dump_json(indent=2)) print(f"New redirects version: {resp['ETag']}") else: with open('/home/emanuele/dev/StandOut/lambda_config/redirects.json', 'w') as f: f.write(redirects.model_dump_json(indent=2)) else: print("No Action") return True def downloadRedirects(client, context) -> Redirects: if context is not None: resp = client.get_object( Bucket=bucket_config, Key='redirects.json' ) return Redirects.model_validate_json(resp['Body'].read()) else: with open('/home/emanuele/dev/StandOut/lambda_config/redirects.json', 'r') as f: return Redirects.model_validate_json(f.read(), strict=False) def getObjectKeys(record: Record) -> list[str]: keys = [v for v in record.s3.object.key.split('/') if v != ''] keys.reverse() return keys def processAdd(record: Record, redirects: Redirects, client) -> None: # splitta la chiave per capire la directory keys = getObjectKeys(record=record) # crea il primo utente se necessario o selezionalo cust_name = keys.pop() # cerca la chiave utente dal nome e nel caso non sia presente creane uno nuovo cust_id = searchCustomerKey(redirects=redirects, cust_name=cust_name, default=uuid4().hex) assert(cust_id is not None) # uuid cannot return none if redirects.customers is None: redirects.customers = {cust_id: Customer(name=cust_name, status="active", tags=None)} if cust_id not in redirects.customers.keys(): redirects.customers[cust_id] = Customer(name=cust_name, status="active", tags=None) # Aggiunto solo un cliente c = redirects.customers[cust_id] if len(keys) == 0 or not c: return # crea un tag per l'utente, con contenuto nullo o selezionalo tag_key = keys.pop() if c.tags is None: c.tags = {tag_key: Tag(status="active", content=None)} if tag_key not in c.tags.keys(): c.tags[tag_key] = Tag(status="active", content=None) # Aggiunta anche una chiave t = c.tags[tag_key] if len(keys) == 0 or not t: return # Crea un contenuto per il tag a seconda della lunghezza della chiave file_name = keys[0] if file_name == "url.txt": with client.get_object(Bucket=bucket_data, Key=record.s3.object.key)['Body'] as url_file: content = Content(type='url', key=file_name, url=url_file.readline().decode().strip(), tag_url=None) else: content = Content(type='s3', key=file_name, url=None, tag_url=None) # Aggiungi il contenuto a una faccia o al tag a seconda della lunghezza del path match len(keys): case 2: content.tag_url = generateTagUrl(cust_id, tag_key, face_id=keys[1]) if t.content is None or isinstance(t.content, Content): t.content = {keys[1]: content} elif isinstance(t.content, dict): t.content[keys[1]] = content case 1: content.tag_url = generateTagUrl(cust_id, tag_key, None) t.content = content case _: print("Too long keys") print(f"ObjectCreated: {record.s3.object.key}") def generateTagUrl(cust_id: str, tag_id: str, face_id: str | None) -> str: return f"{function_url}/{api_version}?id={cust_id}&tag_id={tag_id}{f"&face_id={face_id}" if face_id else ""}" def searchCustomerKey(redirects: Redirects, cust_name: str, default: str | None) -> str | None: if redirects.customers is None: return default for k,v in redirects.customers.items(): if v is not None and v.name == cust_name: return k return default def processDelete(record: Record, redirects: Redirects) -> None: keys = getObjectKeys(record=record) if not redirects.customers: return cust_name = keys.pop() cust_key = searchCustomerKey(redirects=redirects, cust_name=cust_name, default=None) if cust_key is None: return customer = redirects.customers.get(cust_key, None) if len(keys) == 0 or not customer: redirects.customers.pop(cust_key, None) return if not customer.tags: return tag_key = keys.pop() tag = customer.tags.get(tag_key, None) if len(keys) == 0 or not tag: customer.tags.pop(tag_key, None) return match len(keys): case 2: if not tag.content or not isinstance(tag.content, Dict): return tag.content.pop(keys[1], None) case 1: if isinstance(tag.content, Dict) and keys[0] in tag.content.keys(): tag.content[keys[0]] = None else: tag.content = None case _: print("Unexpected") return print(f"Object Remove: {record.s3.object.key}") if __name__ == "__main__": with open('/home/emanuele/dev/StandOut/lambda_config/test.json', 'r') as f: lambda_handler(json.load(f), None)