import os import json import boto3 from typing import Optional from pydantic import BaseModel import pydantic_core from redirects_base import Content, Customer, Redirects, Tag s3_client = None bucket_config = '' bucket_data = '' class S3Bucket(BaseModel): name: str ownerIdentity: dict arn: str class S3Object(BaseModel): key: str eTag: Optional[str | None] = None size: Optional[int | None] = None sequencer: Optional[str | None] = None class S3Event(BaseModel): s3SchemaVersion: str bucket: S3Bucket object: S3Object class Record(BaseModel): eventName: str eventSource: str eventTime: str s3: S3Event def lambda_handler(event: dict, context): global s3_client, bucket_config, bucket_data if s3_client is None: print("Init Function") bucket_config = os.environ.get('BUCKET_CONFIG', 'standout-config') bucket_data = os.environ.get('BUCKET_DATA', 'standout-data') print(f'Bucket Config: {bucket_config}') print(f' Bucket Data: {bucket_data}') s3_client = boto3.client('s3') ## Download redirects file redirects: Redirects try: if context is not None: resp = s3_client.get_object( Bucket=bucket_config, Key='redirects.json' ) redirects = Redirects.model_validate_json(resp['Body'].read()) else: with open('/home/emanuele/dev/StandOut/lambda_config/redirects.json', 'r') as f: redirects = Redirects.model_validate_json(f.read()) except s3_client.exceptions.NoSuchKey as e: print(e) # Oppure pagina "siamo spiacenti ma il contenuto non e' disponibile" return { "statusCode": 404 } # Proces records for r in event["Records"]: record = Record(**r) if record.eventSource != "aws:s3": return False print(f"Action: {record.eventName}") print(f"Object: {record.s3}") # splitta la chiave per capire la directory keys = record.s3.object.key.split('/') keys.reverse() match record.eventName: case "ObjectCreated:Put" | "ObjectCreated:CompleteMultipartUpload": print(f"ObjectCreated: {record.s3.object.key}") # crea il primo utente se necessario o selezionalo cust_key = keys.pop() if redirects.customers is None: redirects.customers = {cust_key: Customer(status="active")} if cust_key not in redirects.customers.keys(): redirects.customers[cust_key] = Customer(status="active") # Aggiunto solo un cliente if len(keys) == 0: break c = redirects.customers[cust_key] # crea un tag per l'utente, con contenuto nullo o selezionalo tag_key = keys.pop() if c.tags is None: c.tags = {tag_key: Tag(status="active", content=None)} if tag_key not in c.tags.keys(): c.tags[tag_key] = Tag(status="active", content=None) # Aggiunta anche una chiave if len(keys) == 0: break t = c.tags[tag_key] # Crea un contenuto per il tag a seconda della lunghezza della chiave file_name = keys[0] if file_name == "url.txt": with s3_client.get_object(Bucket=bucket_data, Key=record.s3.object.key)['Body'] as url_file: content = Content(type='url', key=file_name, url=url_file.readline().decode().strip()) else: content = Content(type='s3', key=file_name, url=None) # Aggiungi il contenuto a una faccia o al tag a seconda della lunghezza del path match len(keys): case 2: if t.content is None or isinstance(t.content, Content): t.content = {keys[1]: content} elif isinstance(t.content, dict): t.content[keys[1]] = content case 1: t.content = content case _: print("Too long keys") case "ObjectCreated:Copy": print(f"Object copy: {record.s3.object.key}") case "s3:ObjectRemoved:Delete": print(f"Object remove: {record.s3.object.key}") case _: print("Unknown action") if context is not None: resp = s3_client.put_object(Bucket=bucket_config, Key='redirects.json', Body=redirects.model_dump_json(indent=2)) print(f"New redirects version: {resp['ETag']}") else: with open('/home/emanuele/dev/StandOut/lambda_config/redirects.json', 'w') as f: f.write(redirects.model_dump_json(indent=2)) return True if __name__ == "__main__": with open('/home/emanuele/dev/StandOut/lambda_config/test.json', 'r') as f: lambda_handler(json.load(f), None)