Files
py-ailbl-user-avatar/apps/crud.py

66 lines
1.9 KiB
Python
Raw Normal View History

2025-11-18 12:40:34 +07:00
import io
from flask import Response
2025-12-02 04:38:34 +00:00
from helpers import S3_BUCKET, get_secret, s3_client
2025-11-18 12:40:34 +07:00
from PIL import Image
2025-12-02 04:38:34 +00:00
# Create&Update function to upload or update user avatar S3/Minio
def update_or_create_avatar(user_id: str, file):
2025-11-18 12:40:34 +07:00
try:
file_data = file.read()
2025-12-02 04:38:34 +00:00
# Bản chất là đường dẫn trong bucket + tên file = user_id
object_name = f"{get_secret('S3_PREFIX')}/{user_id}"
result = s3_client.put_object(
Bucket=S3_BUCKET,
Key=object_name,
Body=io.BytesIO(file_data),
ContentLength=len(file_data),
ContentType=file.content_type,
2025-11-18 12:40:34 +07:00
)
2025-12-02 04:38:34 +00:00
return result, 200
2025-11-18 12:40:34 +07:00
except Exception as e:
return {"error": str(e)}, 500
2025-12-02 04:38:34 +00:00
def get_avatar_url(user_id: str): # Read function to get user avatar from S3/Minio
2025-11-18 12:40:34 +07:00
try:
2025-12-02 04:38:34 +00:00
response = s3_client.get_object(
Bucket=S3_BUCKET,
Key=f"{get_secret('S3_PREFIX')}/{user_id}"
2025-11-18 12:40:34 +07:00
)
2025-12-02 04:38:34 +00:00
# image_data = response["body"].read(content_type)
image_data = response['Body'].read()
2025-11-18 12:40:34 +07:00
with Image.open(io.BytesIO(image_data)) as img:
fmt = img.format.lower() # ví dụ: 'jpeg', 'png', 'webp'
content_type = f"image/{'jpeg' if fmt == 'jpg' else fmt}"
2025-12-02 04:38:34 +00:00
# return Response(
# io.BytesIO(image_data),
# content_type=content_type,
# direct_passthrough=True,
# )
2025-11-18 12:40:34 +07:00
return Response(
2025-12-02 04:38:34 +00:00
image_data,
2025-11-18 12:40:34 +07:00
content_type=content_type,
2025-12-02 04:38:34 +00:00
direct_passthrough=True
), 200
2025-11-18 12:40:34 +07:00
except Exception as e:
return {"error": str(e)}, 500
2025-12-02 04:38:34 +00:00
# Delete Function to delete user avatar from S3/Minio
def delete_avatar(user_id: str) -> dict:
2025-11-18 12:40:34 +07:00
try:
2025-12-02 04:38:34 +00:00
result = s3_client.delete_object(
Bucket=S3_BUCKET,
Key=f"{get_secret('S3_PREFIX')}/{user_id}"
2025-11-18 12:40:34 +07:00
)
return result, 200
except Exception as e:
return {"error": str(e)}, 500