import os import json import boto3 from typing import Optional from pydantic import BaseModel import pydantic_core from redirects_base import Content, Customer, Redirects, Tag s3_client = None bucket_config = '' bucket_data = '' class S3Bucket(BaseModel): name: str ownerIdentity: dict arn: str class S3Object(BaseModel): key: str eTag: Optional[str | None] = None size: Optional[int | None] = None sequencer: Optional[str | None] = None class S3Event(BaseModel): s3SchemaVersion: str bucket: S3Bucket object: S3Object class Record(BaseModel): eventName: str eventSource: str eventTime: str s3: S3Event def lambda_handler(event: dict, context): global s3_client, bucket_config, bucket_data if s3_client is None: print("Init Function") bucket_config = os.environ.get('BUCKET_CONFIG', 'standout-config') bucket_data = os.environ.get('BUCKET_DATA', 'standout-data') print(f'Bucket Config: {bucket_config}') print(f' Bucket Data: {bucket_data}') s3_client = boto3.client('s3') ## Download redirects file redirects: Redirects try: #resp = s3_client.get_object( # Bucket=bucket_config, # Key='redirects.json' #) #redirects = Redirects(**json.load(resp['Body'])) with open('/home/emanuele/dev/StandOut/lambda_config/redirects.json', 'r') as f: #redirects = Redirects(**json.load(f)) redirects = Redirects.model_validate_json(f.read()) except s3_client.exceptions.NoSuchKey as e: print(e) # Oppure pagina "siamo spiacenti ma il contenuto non e' disponibile" return { "statusCode": 404 } # Proces records for r in event["Records"]: record = Record(**r) if record.eventSource != "aws:s3": return False print(f"Action: {record.eventName}") print(f"Object: {record.s3}") # splitta la chiave per capire la directory keys = record.s3.object.key.split('/') file_name = keys[-1] match record.eventName: case "ObjectCreated:Put" | "ObjectCreated:Post": print(f"ObjectCreated: {record.s3.object.key}") # crea il primo utente se necessario o selezionalo if redirects.customers is None: redirects.customers = {keys[0]: Customer(status='active')} c = redirects.customers[keys[0]] # crea un tag per l'utente, con contenuto nullo o selezionalo if c.tags is None: c.tags = {keys[1]: Tag(status="active", content=None)} t = c.tags[keys[1]] # crea un contenuto per il tag a seconda della lunghezza della chiave if file_name == "url.txt": with s3_client.get_object(Bucket=bucket_data, Key=record.s3.object.key)['Body'] as url_file: content = Content(type='url', key=file_name, url=url_file.readline().decode().strip()) else: content = Content(type='s3', key=file_name, url=None) match len(keys): case 4: if t.content is None: t.content = {keys[2]: content} elif isinstance(t.content, dict): t.content[keys[2]] = content case 3: t.content = content case _: print("Too long keys") with open('/home/emanuele/dev/StandOut/lambda_config/redirects.json', 'w') as f: f.write(redirects.model_dump_json(indent=2)) return True case "ObjectCreated:Copy": print(f"Object copy: {record.s3.object.key}") return True case "s3:ObjectRemoved:*": print(f"Object remove: {record.s3.object.key}") return True case _: print("Unknown action") if __name__ == "__main__": with open('/home/emanuele/dev/StandOut/lambda_config/test.json', 'r') as f: lambda_handler(json.load(f), None)