import os import boto3 import json import boto3.exceptions from botocore.exceptions import ClientError s3_client = None bucket_config = '' bucket_data = '' def lambda_handler(event: dict, context): global s3_client, bucket_config, bucket_data if s3_client is None: print("Init Function") bucket_config = os.environ.get('BUCKET_CONFIG', 'standout-config') bucket_data = os.environ.get('BUCKET_DATA', 'standout-data') print(f'Bucket Config: {bucket_config}') print(f' Bucket Data: {bucket_data}') s3_client = boto3.client('s3') for x in s3_client.list_buckets()['Buckets']: print(f"{x['Name']}: {x['CreationDate'].isoformat()}") try: resp = s3_client.get_object( Bucket=bucket_config, Key='redirects.json' ) except s3_client.exceptions.NoSuchKey as e: print(e) # Oppure pagina "siamo spiacenti ma il contenuto non e' disponibile" return { "statusCode": 404 } try: redirects = json.load(resp["Body"]) params = event.get('queryStringParameters', {}) customer = redirects.get(params['id'], {}) tag = customer.get(params['tag_id'], {}) content = tag.get('content', None) dest = None if content and isinstance(content, dict) and not "type" in content.keys(): dest = content[params['face_id']] else: dest = content if dest and isinstance(dest, dict): match dest.get('type', 's3'): case "s3": try: key = f'{params['id']}/{params['tag_id']}/{dest['key']}' response = s3_client.generate_presigned_url('get_object', Params={'Bucket': bucket_data, 'Key': key}, ExpiresIn=120) except ClientError as e: print(e) finally: dest = response except json.decoder.JSONDecodeError as je: print(je) return { "statusCode": 500 } except KeyError as ke: print(ke) return { "statusCode": 500 } return { "statusCode": 301, "headers": { "Cache-Control": "no-cache", "Location": str(dest) } } if __name__ == "__main__": event = { "resource": "/", "path": "/", "httpMethod": "GET", "queryStringParameters": { "id": "customer1", "tag_id": "tag3", "face_id": "face1" }, } print(json.dumps(lambda_handler( event, None ), indent=2))