import os import boto3 import json import boto3.exceptions from pydantic import BaseModel from typing import Optional s3_client = None bucket_config = '' bucket_data = '' git_version = '' class RequestParams(BaseModel): id: str tag_id: str face_id: Optional[str] = None class Content(BaseModel): type: str key: str url: Optional[str] = None def lambda_handler(event: dict, context): global s3_client, bucket_config, bucket_data, git_version if s3_client is None: print("Init Function") bucket_config = os.environ.get('BUCKET_CONFIG', 'standout-config') bucket_data = os.environ.get('BUCKET_DATA', 'standout-data') git_version = os.environ.get('GIT_VERSION', '00000000') print(f' Version: {git_version}') print(f' Bucket Data: {bucket_data}') print(f'Bucket Config: {bucket_config}') s3_client = boto3.client('s3') ## Download redirects file try: resp = s3_client.get_object( Bucket=bucket_config, Key='redirects.json' ) except s3_client.exceptions.NoSuchKey as e: print(e) # Oppure pagina "siamo spiacenti ma il contenuto non e' disponibile" return { "statusCode": 404 } ## Parse request and get content try: redirects = json.load(resp['Body']) params = RequestParams(**event.get('queryStringParameters', {})) customers = redirects.get('customers', {}) customer = customers.get(params.id, {}) tags = customer.get('tags', {}) tag = tags.get(params.tag_id, {}) content = tag.get('content', None) # In case of multi face tag select the correct face if isinstance(content, dict) and params.face_id: content = content.get(params.face_id, None) if content is None: return { "statusCode": 404 } content = Content(**content) match content.type: case "s3": file_name = content.key.replace('+',' ') # remove slugify of filename key = f'{customer['name']}/{params.tag_id}{'/'+params.face_id if params.face_id else ''}/{file_name}' final_redirect = s3_client.generate_presigned_url('get_object', Params={'Bucket': bucket_data, 'Key': key}, ExpiresIn=120) case "url": final_redirect = content.url except json.decoder.JSONDecodeError as je: print(je) return { "statusCode": 500 } except KeyError as ke: print(ke) return { "statusCode": 404 } except Exception as e: print(e) return { "statusCode": 500 } return { "statusCode": 301, "headers": { "Cache-Control": "no-cache", "Location": str(final_redirect) } } if __name__ == "__main__": event = { "resource": "/", "path": "/", "httpMethod": "GET", "queryStringParameters": { "id": "customer1", "tag_id": "tag2", "face_id": "face2" }, } print(json.dumps(lambda_handler( event, None ), indent=2))