import io from flask import Response from helpers import S3_BUCKET, get_secret, minio_client from PIL import Image def update_or_create_avatar(user_id: str, file): #Create&Update function to upload or update user avatar S3/Minio try: file_data = file.read() object_name = f"{get_secret('S3_PREFIX')}/{user_id}" # Bản chất là đường dẫn trong bucket + tên file = user_id result = minio_client.put_object( S3_BUCKET, object_name, io.BytesIO(file_data), length=len(file_data), content_type=file.content_type, ) return result.object_name, 200 except Exception as e: return {"error": str(e)}, 500 def get_avatar_url(user_id: str): #Read function to get user avatar from S3/Minio try: response = minio_client.get_object( bucket_name=S3_BUCKET, object_name=f"{get_secret('S3_PREFIX')}/{user_id}" ) image_data = response.read() with Image.open(io.BytesIO(image_data)) as img: fmt = img.format.lower() # ví dụ: 'jpeg', 'png', 'webp' content_type = f"image/{'jpeg' if fmt == 'jpg' else fmt}" return Response( io.BytesIO(image_data), content_type=content_type, direct_passthrough=True, ) except Exception as e: return {"error": str(e)}, 500 def delete_avatar(user_id: str) -> dict: #Delete Function to delete user avatar from S3/Minio try: result = minio_client.remove_object( S3_BUCKET, f"{get_secret('S3_PREFIX')}/{user_id}" ) return result, 200 except Exception as e: return {"error": str(e)}, 500