import io from flask import Response from helpers import S3_BUCKET, get_secret, s3_client from PIL import Image # Create&Update function to upload or update user avatar S3/Minio def update_or_create_avatar(user_id: str, file): try: file_data = file.read() # Bản chất là đường dẫn trong bucket + tên file = user_id object_name = f"{get_secret('S3_PREFIX')}/{user_id}" result = s3_client.put_object( Bucket=S3_BUCKET, Key=object_name, Body=io.BytesIO(file_data), ContentLength=len(file_data), ContentType=file.content_type, ) return result, 200 except Exception as e: return {"error": str(e)}, 500 def get_avatar_url(user_id: str): # Read function to get user avatar from S3/Minio try: response = s3_client.get_object( Bucket=S3_BUCKET, Key=f"{get_secret('S3_PREFIX')}/{user_id}" ) # image_data = response["body"].read(content_type) image_data = response['Body'].read() with Image.open(io.BytesIO(image_data)) as img: fmt = img.format.lower() # ví dụ: 'jpeg', 'png', 'webp' content_type = f"image/{'jpeg' if fmt == 'jpg' else fmt}" # return Response( # io.BytesIO(image_data), # content_type=content_type, # direct_passthrough=True, # ) return Response( image_data, content_type=content_type, direct_passthrough=True ), 200 except Exception as e: return {"error": str(e)}, 500 # Delete Function to delete user avatar from S3/Minio def delete_avatar(user_id: str) -> dict: try: result = s3_client.delete_object( Bucket=S3_BUCKET, Key=f"{get_secret('S3_PREFIX')}/{user_id}" ) return result, 200 except Exception as e: return {"error": str(e)}, 500