Files
py-ailbl-user-email/apps/ailbl-admin_email-insert-get-filter.py

201 lines
7.0 KiB
Python
Raw Normal View History

2025-12-06 05:58:48 +07:00
import dataclasses
from typing import Optional
import typing
import crud
from flask import jsonify, request
from helpers import CORS_HEADERS, db_rows_to_array, kratos, str_to_bool
from pydantic import BaseModel, Field, ValidationError
from helpers import kratos, init_db_connection
from schemas import UserEmailRequest
@dataclasses.dataclass # Filter user bao nhieu email
2025-12-08 04:17:46 +00:00
class EmailFilter: # Tao ra class EmailFilter => Dung cls de khoi tao EmailFilter thanh doi tuong roi
2025-12-06 05:58:48 +07:00
ids: typing.Optional[typing.List[str]] = None
email: typing.Optional[str] = None
provider: typing.Optional[str] = None
created_from: typing.Optional[str] = None
created_to: typing.Optional[str] = None
modified_from: typing.Optional[str] = None
modified_to: typing.Optional[str] = None
keywords: typing.Optional[str] = None
primary: typing.Optional[bool] = None
2025-12-08 04:17:46 +00:00
# Tao method tu class do(Chinh la phuong thuc cua EmailFilter), cls chinh ra ham khoi tao
2025-12-06 05:58:48 +07:00
@classmethod
def from_request_queries(cls) -> "EmailFilter":
return cls(
2025-12-08 04:17:46 +00:00
ids=request.args.getlist("filter[ids]"), # Dung object request doc du lieu client gui len roi nhet vao cac file nay
2025-12-06 05:58:48 +07:00
email=request.args.get("filter[email]"),
provider=request.args.get("filter[provider]"),
created_from=request.args.get("filter[created_from]"),
created_to=request.args.get("filter[created_to]"),
modified_from=request.args.get("filter[modified_from]"),
modified_to=request.args.get("filter[modified_to]"),
keywords=request.args.get("filter[key]"),
primary=str_to_bool(request.args.get("filter[primary]"))
)
@dataclasses.dataclass
class Page:
page: int = 0
size: int = 10
asc: bool = False
@classmethod
2025-12-08 04:17:46 +00:00
def from_request_queries(cls) -> "Page": # => Doi Tuong Page
2025-12-06 05:58:48 +07:00
return Page(
page=int(request.args.get("page", 0)),
size=int(request.args.get("size", 10)),
asc=request.args.get("asc", type=str_to_bool) or False
)
@dataclasses.dataclass
class EmailPage(Page):
sortby: typing.Optional[str] = None
filter: EmailFilter = dataclasses.field(
default_factory=EmailFilter.from_request_queries)
@classmethod
2025-12-08 04:17:46 +00:00
def from_request_queries(cls) -> "EmailPage": # => Doi tuong EmailPage
2025-12-06 05:58:48 +07:00
base = Page.from_request_queries()
return cls(**dataclasses.asdict(base), sortby=request.args.get("sortby"))
def main():
"""
```fission
{
"name": "email-admin-insert-get-filter",
"http_triggers": {
"email-admin-insert-get-filter-http": {
"url": "/ailbl/admin/users/{UserId}/emails ",
"methods": ["POST", "GET"]
}
}
}
```
"""
try:
if request.method == "POST":
return insert_email()
elif request.method == "GET":
return filter_emails()
else:
return {"error": "Method not allow"}, 405
except Exception as ex:
return jsonify({"error": str(ex)}), 500
def insert_email():
user_id = request.headers.get("X-Fission-Params-UserId")
if not user_id:
return jsonify({"errorCode": "USER_ID_REQUIRED"}), 400, CORS_HEADERS
try:
data = request.get_json()
if not data:
return jsonify({"errorCode": "NO_DATA_PROVIDED"}), 400, CORS_HEADERS
parsed = UserEmailRequest(**data)
except ValidationError as e:
return jsonify({"errorCode": "VALIDATION_ERROR", "details": e.errors()}), 422, CORS_HEADERS
except Exception as e:
return jsonify({"errorCode": "BAD_REQUEST"}), 400, CORS_HEADERS
try:
add_email, status_code, headers = crud.add_email_to_user(
user_id, parsed.email)
if parsed.is_primary:
# update email kratos
identity = kratos.get_identity(user_id)
traits = identity.traits
traits["email"] = parsed.email
res = kratos.update_identity(
id=user_id,
update_identity_body={
"schema_id": identity.schema_id,
"traits": traits,
"state": identity.state,
},
)
return jsonify(add_email), status_code, headers
except Exception as e:
return jsonify({"error": str(e)}), 500
def filter_emails():
paging = EmailPage.from_request_queries()
user_id = request.headers.get(
"X-Fission-Params-UserId") # X-Fission lay tren path
if not user_id:
return jsonify({"errorCode": "USER_ID_REQUIRED"}), 400, CORS_HEADERS
conn = None
try:
conn = init_db_connection()
with conn.cursor() as cursor:
2025-12-08 04:17:46 +00:00
records = __filter_email(cursor, paging, user_id) # Goi
2025-12-06 05:58:48 +07:00
return jsonify(
records,
), 200, CORS_HEADERS
except Exception as e:
# current_app.logger.error(f"[filter_emails] DB Error: {e}")
return jsonify({"errorCode": "DATABASE_ERROR"}), 500, CORS_HEADERS
finally:
if conn:
conn.close()
def __filter_email(cursor, paging: EmailPage, user_id: str):
conditions = ["user_id = %(user_id)s"]
values = {"user_id": user_id}
if paging.filter.ids:
conditions.append("id = ANY(%(ids)s)")
values["ids"] = paging.filter.ids
if paging.filter.email:
conditions.append("LOWER(email) LIKE %(email)s")
values["email"] = f"%{paging.filter.email.lower()}%"
if paging.filter.provider:
conditions.append("LOWER(provider) LIKE %(provider)s")
values["provider"] = f"%{paging.filter.provider.lower()}%"
if paging.filter.created_from:
conditions.append("created >= %(created_from)s")
values["created_from"] = paging.filter.created_from
if paging.filter.created_to:
conditions.append("created <= %(created_to)s")
values["created_to"] = paging.filter.created_to
if paging.filter.modified_from:
conditions.append("modified >= %(modified_from)s")
values["modified_from"] = paging.filter.modified_from
if paging.filter.modified_to:
conditions.append("modified <= %(modified_to)s")
values["modified_to"] = paging.filter.modified_to
if paging.filter.keywords:
conditions.append(
"(LOWER(email) LIKE %(keywords)s OR LOWER(provider) LIKE %(keywords)s)")
values["keywords"] = f"%{paging.filter.keywords.lower()}%"
where_clause = " AND ".join(conditions)
if where_clause:
where_clause = "WHERE " + where_clause
order_clause = ""
if paging.sortby:
direction = "ASC" if paging.asc else "DESC"
order_clause = f"ORDER BY {paging.sortby} {direction}"
sql = f"""
SELECT *, COUNT(*) OVER() AS total
FROM ailbl_user_email
{where_clause}
{order_clause}
LIMIT %(limit)s OFFSET %(offset)s
"""
values["limit"] = paging.size
values["offset"] = paging.page * paging.size
cursor.execute(sql, values)
return db_rows_to_array(cursor, cursor.fetchall())