import boto3 import json import boto3.exceptions from botocore.exceptions import ClientError s3_client = None def lambda_handler(event: dict, context): global s3_client if s3_client is None: s3_client = boto3.client('s3') for x in s3_client.list_buckets()['Buckets']: print(f"{x['Name']}: {x['CreationDate'].isoformat()}") try: resp = s3_client.get_object( Bucket='standout-data', Key='redirects.json' ) except s3_client.exceptions.NoSuchKey as e: print(e) # Oppure pagina "siamo spiacenti ma il contenuto non e' disponibile" return { "statusCode": 404 } try: redirects = json.load(resp["Body"]) params = event.get('queryStringParameters', {}) customer = redirects.get(params['cust_id'], {}) gadget = customer.get(params['gadget_id'], {}) content = gadget.get('content', None) dest = None if content and isinstance(content, dict): dest = content[params['face_id']] else: dest = content if dest and isinstance(dest, dict): match dest.get('type', 's3'): case "s3": try: response = s3_client.generate_presigned_url('get_object', Params={'Bucket': 'standout-data', 'Key': dest['key']}, ExpiresIn=120) except ClientError as e: print(e) finally: dest = response except json.decoder.JSONDecodeError as je: print(je) return { "statusCode": 500 } except KeyError as ke: print(ke) return { "statusCode": 500 } return { "statusCode": 301, "headers": { "Cache-Control": "no-cache", "Location": str(dest) } } if __name__ == "__main__": event = { "resource": "/", "path": "/", "httpMethod": "GET", "queryStringParameters": { "cust_id": "cust_id1", "gadget_id": "gadget_id1", "face_id": "face_id3" }, } print(json.dumps(lambda_handler( event, None ), indent=2))