Files
py-ailbl-user-avatar/apps/ailbl-useravatar-user-upload.py

285 lines
8.8 KiB
Python
Raw Normal View History

2025-11-18 12:40:34 +07:00
import dataclasses
import enum
import json
import typing
import uuid
import os
from storage.minio_client import get_minio_client, check_existing_avatar_on_minio, upload_to_minio
ALLOWED_IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"}
import crud
from flask import current_app, jsonify, request
from helpers import (
CORS_HEADERS,
db_row_to_dict,
db_rows_to_array,
init_db_connection,
str_to_bool,
)
from pydantic import ValidationError
from schemas import TagRequest
def main():
"""
```fission
{
"name": "avatar-admin-get-insert-delete-put",
"http_triggers": {
"avatar-admin-get-insert-delete-put-http": {
"url": "/gh/users/avatars",
"methods": ["PUT", "POST", "DELETE", "GET"]
}
}
}
```
"""
try:
if request.method == "PUT":
return make_update_avatar_request()
elif request.method == "DELETE":
return make_delete_avatar_request()
elif request.method == "POST":
return make_insert_request()
elif request.method == "GET":
return make_get_avatar_request()
else:
return {"error": "Method not allow"}, 405
except Exception as ex:
return jsonify({"error": str(ex)}), 500
def make_insert_request():
try:
user_id = request.headers.get("X-User") # Lay user_id tu header X-User
file = request.files.get("avatar") #Lay file tu form-data voi key la 'avatar'
if not user_id or not file:
return jsonify({"error": "user_id or file is required"}), 400
if file.mimetype not in ALLOWED_IMAGE_TYPES: #Check mimetype(kieu du lieu cua file anh)
return jsonify(
{"error": "Invalid file type. Only JPG, PNG, GIF, WEBP are allowed."}
), 400
response, status = crud.update_or_create_avatar(user_id, file)
return jsonify(response), status
except Exception as e:
return jsonify({"error": str(e)}), 500
def make_update_avatar_request():
try:
user_id = request.headers.get("X-User") # Lay user_id tu header X-User
except Exception as e:
return jsonify({"error": str(e)}), 500
return {"message": "Update avatar - Not implemented yet"}, 501
def make_delete_avatar_request():
return {"message": "Delete avatar - Not implemented yet"}, 501
def make_get_avatar_request():
return {"message": "Get avatar - Not implemented yet"}, 501
def __insert_tag(cursor, data: TagRequest):
sql = """
INSERT INTO ailbl_tag (id, tag, kind, ref, primary_color, secondary_color, created, modified)
VALUES (%(id)s, %(tag)s, %(kind)s, %(ref)s, %(primary_color)s, %(secondary_color)s, now(), now())
RETURNING *
"""
cursor.execute(
sql,
{
"id": str(uuid.uuid4()),
"tag": data.tag,
"kind": data.kind.value,
"ref": data.ref,
"primary_color": data.primary_color,
"secondary_color": data.secondary_color,
},
)
row = cursor.fetchone()
if row:
return db_row_to_dict(cursor, row)
else:
raise Exception("Insert tag failed")
def make_filter_request():
paging = TagPage.from_request_queries()
conn = None
try:
conn = init_db_connection()
with conn.cursor() as cursor:
records = __filter_tag(cursor, paging)
return jsonify(records)
finally:
if conn is not None:
conn.close()
current_app.logger.info("Close DB connection")
def __filter_tag(cursor, paging: "TagPage"):
conditions = []
values = {}
if paging.filter.ids:
conditions.append("id = ANY(%(ids)s)")
values["ids"] = paging.filter.ids
if paging.filter.keyword:
conditions.append("LOWER(tag) LIKE %(keyword)s")
values["keyword"] = f"%{paging.filter.keyword.lower()}%"
if paging.filter.kind:
conditions.append("kind = %(kind)s::smallint")
values["kind"] = int(paging.filter.kind)
if paging.filter.ref:
conditions.append("LOWER(ref) LIKE %(ref)s")
values["ref"] = f"%{paging.filter.ref.lower()}%"
if paging.filter.primary_color:
conditions.append("primary_color = %(primary_color)s")
values["primary_color"] = paging.filter.primary_color
if paging.filter.secondary_color:
conditions.append("secondary_color = %(secondary_color)s")
values["secondary_color"] = paging.filter.secondary_color
if paging.filter.created_from:
conditions.append("created >= %(created_from)s")
values["created_from"] = paging.filter.created_from
if paging.filter.created_to:
conditions.append("created <= %(created_to)s")
values["created_to"] = paging.filter.created_to
if paging.filter.modified_from:
conditions.append("modified >= %(modified_from)s")
values["modified_from"] = paging.filter.modified_from
if paging.filter.modified_to:
conditions.append("modified <= %(modified_to)s")
values["modified_to"] = paging.filter.modified_to
where_clause = " AND ".join(conditions)
if where_clause:
where_clause = "WHERE " + where_clause
order_clause = ""
if paging.sortby:
direction = "ASC" if paging.asc else "DESC"
order_clause = f" ORDER BY {paging.sortby.value} {direction} "
sql = f"""
SELECT
t.*,
(
SELECT COUNT(*)
FROM ailbl_tag_ref r
WHERE r.tag_id = t.id
) AS ref_count,
count(*) OVER() AS total
FROM ailbl_tag t
{where_clause}
{order_clause}
LIMIT %(limit)s OFFSET %(offset)s
"""
values["limit"] = paging.size
values["offset"] = paging.page * paging.size
cursor.execute(sql, values)
rows = cursor.fetchall()
return db_rows_to_array(cursor, rows)
@dataclasses.dataclass
class Page:
page: typing.Optional[int] = None
size: typing.Optional[int] = None
asc: typing.Optional[bool] = None
@classmethod
def from_request_queries(cls) -> "Page":
paging = Page()
paging.page = int(request.args.get("page", 0))
paging.size = int(request.args.get("size", 8))
paging.asc = request.args.get("asc", type=str_to_bool)
return paging
class TagSortField(str, enum.Enum):
CREATED = "created"
KIND = "kind"
MODIFIED = "modified"
class KindType(str, enum.Enum):
"""Notification Types"""
PROJECT_GROUP = "1"
PROJECT_DATA = "2"
PROJECT_MEMBER = "3"
PROJECT_DISCUSSTION_TOPIC = "4"
PROJECT = "5"
TICKET = "6"
@dataclasses.dataclass
class TagFilter:
ids: typing.Optional[typing.List[str]] = None
tag: typing.Optional[str] = None
keyword: typing.Optional[str] = None
kind: typing.Optional[typing.List[KindType]] = None
ref: typing.Optional[str] = None
primary_color: typing.Optional[str] = None
secondary_color: typing.Optional[str] = None
created_from: typing.Optional[str] = None
created_to: typing.Optional[str] = None
modified_from: typing.Optional[str] = None
modified_to: typing.Optional[str] = None
@classmethod
def from_request_queries(cls) -> "TagFilter":
filter = TagFilter()
filter.ids = request.args.getlist("filter[ids]")
filter.keyword = request.args.get("filter[keyword]")
kind_str = request.args.get("filter[kind]")
if kind_str:
try:
kind_enum = KindType[kind_str] # enum theo .name
filter.kind = kind_enum.value # .value là "1", "2", ...
except KeyError:
raise ValueError(
f"KindType should be one of: {[e.name for e in KindType]}"
)
filter.ref = request.args.get("filter[ref]")
filter.primary_color = request.args.get("filter[primary_color]")
filter.secondary_color = request.args.get("filter[secondary_color]")
filter.created_to = request.args.get("filter[created_to]")
filter.created_from = request.args.get("filter[created_from]")
filter.modified_from = request.args.get("filter[modified_from]")
filter.modified_to = request.args.get("filter[modified_to]")
return filter
@dataclasses.dataclass
class TagPage(Page):
sortby: typing.Optional[TagSortField] = None
filter: typing.Optional[TagFilter] = dataclasses.field(
default_factory=TagFilter.from_request_queries
)
@classmethod
def from_request_queries(cls) -> "TagPage":
paging = super(TagPage, cls).from_request_queries()
paging = TagPage(**dataclasses.asdict(paging))
paging.sortby = (
TagSortField[request.args.get("sortby")]
if request.args.get("sortby")
else None
)
return paging