EmailDone1
Some checks failed
K8S Fission Deployment / Deployment fission functions (push) Failing after 22s
Some checks failed
K8S Fission Deployment / Deployment fission functions (push) Failing after 22s
This commit is contained in:
327
apps/crud.py
327
apps/crud.py
@@ -1,65 +1,286 @@
|
||||
import io
|
||||
|
||||
from flask import Response
|
||||
from helpers import S3_BUCKET, get_secret, s3_client
|
||||
from PIL import Image
|
||||
import json
|
||||
import urllib.parse
|
||||
import uuid
|
||||
from typing import List
|
||||
from flask import current_app
|
||||
import requests
|
||||
from helpers import CORS_HEADERS, get_secret, init_db_connection, kratos
|
||||
import uuid as uuid_lib
|
||||
|
||||
|
||||
# Create&Update function to upload or update user avatar S3/Minio
|
||||
def update_or_create_avatar(user_id: str, file):
|
||||
def get_user_email_from_kratos(user_id: str): # Get email from Kratos
|
||||
try:
|
||||
file_data = file.read()
|
||||
# Bản chất là đường dẫn trong bucket + tên file = user_id
|
||||
object_name = f"{get_secret('S3_PREFIX')}/{user_id}"
|
||||
result = s3_client.put_object(
|
||||
Bucket=S3_BUCKET,
|
||||
Key=object_name,
|
||||
Body=io.BytesIO(file_data),
|
||||
ContentLength=len(file_data),
|
||||
ContentType=file.content_type,
|
||||
)
|
||||
|
||||
return result, 200
|
||||
|
||||
current_app.logger.info(
|
||||
f"[get_user_email_from_kratos] Checking Kratos connection for user_id={user_id}")
|
||||
identity = kratos.get_identity(id=user_id)
|
||||
current_app.logger.info(
|
||||
f"[get_user_email_from_kratos] Kratos response: {identity}")
|
||||
return identity.traits.get("email")
|
||||
except Exception as e:
|
||||
return {"error": str(e)}, 500
|
||||
current_app.logger.error(
|
||||
f"[get_user_email_from_kratos] Error connecting to Kratos: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def get_avatar_url(user_id: str): # Read function to get user avatar from S3/Minio
|
||||
def get_emails_by_user(user_id: str):
|
||||
conn = None
|
||||
try:
|
||||
response = s3_client.get_object(
|
||||
Bucket=S3_BUCKET,
|
||||
Key=f"{get_secret('S3_PREFIX')}/{user_id}"
|
||||
)
|
||||
# image_data = response["body"].read(content_type)
|
||||
image_data = response['Body'].read()
|
||||
|
||||
with Image.open(io.BytesIO(image_data)) as img:
|
||||
fmt = img.format.lower() # ví dụ: 'jpeg', 'png', 'webp'
|
||||
content_type = f"image/{'jpeg' if fmt == 'jpg' else fmt}"
|
||||
# return Response(
|
||||
# io.BytesIO(image_data),
|
||||
# content_type=content_type,
|
||||
# direct_passthrough=True,
|
||||
# )
|
||||
|
||||
return Response(
|
||||
image_data,
|
||||
content_type=content_type,
|
||||
direct_passthrough=True
|
||||
), 200
|
||||
|
||||
conn = init_db_connection()
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute("""
|
||||
SELECT id, user_id, email, provider, created, modified
|
||||
FROM ailbl_user_email
|
||||
WHERE user_id = %s
|
||||
""", (user_id,))
|
||||
rows = cursor.fetchall()
|
||||
emails = []
|
||||
for row in rows:
|
||||
emails.append({
|
||||
"id": row[0],
|
||||
"user_id": row[1],
|
||||
"email": row[2],
|
||||
"provider": row[3],
|
||||
"created": row[4].isoformat(),
|
||||
"modified": row[5].isoformat()
|
||||
})
|
||||
return emails, 200, CORS_HEADERS
|
||||
except Exception as e:
|
||||
return {"error": str(e)}, 500
|
||||
current_app.logger.error(f"[get_emails_by_user] DB Error: {str(e)}")
|
||||
return {"error": "DATABASE_ERROR"}, 500, CORS_HEADERS
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
|
||||
# Delete Function to delete user avatar from S3/Minio
|
||||
def delete_avatar(user_id: str) -> dict:
|
||||
# def create_or_update_config_account(email: str, key: str, type: str = "create"):
|
||||
# url_get_config = f"{get_secret('API_CONFIG_USER')}/{get_secret('KEY_CONFIG_USER')}"
|
||||
# if type == "update":
|
||||
# url_get_config = f"{get_secret('API_CONFIG_USER')}/{get_secret('PREFIX_KEY_CONFIG_USER')}_{key}"
|
||||
# headers = {}
|
||||
# payload = {}
|
||||
|
||||
# response = requests.request(
|
||||
# "GET", url_get_config, headers=headers, data=payload)
|
||||
# if response.status_code != 200:
|
||||
# url_get_config = (
|
||||
# f"{get_secret('API_CONFIG_USER')}/{get_secret('KEY_CONFIG_USER')}"
|
||||
# )
|
||||
# response = requests.request(
|
||||
# "GET", url_get_config, headers=headers, data=payload
|
||||
# )
|
||||
# if response.status_code != 200:
|
||||
# return {"error": "Failed to retrieve configuration"}, response.status_code
|
||||
|
||||
# config_user = response.json()
|
||||
# if email != "":
|
||||
# config_user["profile_setting"]["primary"]["email"] = email
|
||||
# config_user["user_id"] = key
|
||||
# json_str = json.dumps(config_user) # Convert dict to JSON string
|
||||
# encoded_json = urllib.parse.quote(json_str) # URL encode
|
||||
|
||||
# if (
|
||||
# type == "update"
|
||||
# and url_get_config
|
||||
# != f"{get_secret('API_CONFIG_USER')}/{get_secret('KEY_CONFIG_USER')}"
|
||||
# ):
|
||||
# url_save = f"{get_secret('API_CONFIG_USER')}/{get_secret('PREFIX_KEY_CONFIG_USER')}_{key}"
|
||||
# payload = (
|
||||
# f"content={encoded_json}"
|
||||
# f"&mime_type=application/json"
|
||||
# f"&public=true"
|
||||
# f"&active=true"
|
||||
# f"&refer=GEOHUB"
|
||||
# )
|
||||
# method = "PUT"
|
||||
# else:
|
||||
# url_save = f"{get_secret('API_CONFIG_USER')}"
|
||||
# payload = f"content={encoded_json}&key={get_secret('PREFIX_KEY_CONFIG_USER')}_{key}&mime_type=application/json&public=true&active=true"
|
||||
# method = "POST"
|
||||
# headers = {"Content-Type": "application/x-www-form-urlencoded"}
|
||||
# response_save = requests.request(
|
||||
# method, url_save, headers=headers, data=payload)
|
||||
|
||||
# return response_save.json(), response_save.status_code
|
||||
|
||||
|
||||
# def get_config_account(key: str):
|
||||
# url_get_config = (
|
||||
# f"{get_secret('API_CONFIG_USER')}/{get_secret('PREFIX_KEY_CONFIG_USER')}_{key}"
|
||||
# )
|
||||
# headers = {}
|
||||
# payload = {}
|
||||
# response = requests.request(
|
||||
# "GET", url_get_config, headers=headers, data=payload)
|
||||
|
||||
# if response.status_code != 200:
|
||||
# email = get_user_email_from_kratos(key)
|
||||
# if not email:
|
||||
# return {"error": "User email not found"}, 404
|
||||
# res, status = create_or_update_config_account(
|
||||
# email=email, key=key, type="create"
|
||||
# )
|
||||
# if status != 200:
|
||||
# return res, status
|
||||
|
||||
# email_new, email_status, headers = add_email_to_user(
|
||||
# user_id=key, email=email)
|
||||
|
||||
# if email_status != 200:
|
||||
# return email_new, email_status
|
||||
# response = requests.request(
|
||||
# "GET", url_get_config, headers=headers, data=payload
|
||||
# )
|
||||
|
||||
# return response.json(), response.status_code
|
||||
|
||||
|
||||
# def create_config_account_if_not_exists(user_id: str):
|
||||
# url_get_config = (
|
||||
# f"{get_secret('API_CONFIG_USER')}/{get_secret('PREFIX_KEY_CONFIG_USER')}_{user_id}"
|
||||
# )
|
||||
# headers = {}
|
||||
# payload = {}
|
||||
# response = requests.request(
|
||||
# "GET", url_get_config, headers=headers, data=payload)
|
||||
# if response.status_code != 200:
|
||||
# email = get_user_email_from_kratos(user_id)
|
||||
# if not email:
|
||||
# return {"error": "User email not found"}, 404
|
||||
# res, status = create_or_update_config_account(
|
||||
# email=email, key=user_id, type="create"
|
||||
# )
|
||||
# if status != 200:
|
||||
# return res, status
|
||||
|
||||
# email_new, email_status, headers = add_email_to_user(
|
||||
# user_id=user_id, email=email)
|
||||
|
||||
# if email_status != 200:
|
||||
# return email_new, email_status
|
||||
# return {"message": "Config account exists or created successfully"}, 200
|
||||
|
||||
|
||||
def add_email_to_user(user_id: str, email: str): # Add email
|
||||
conn = None
|
||||
cursor = None
|
||||
try:
|
||||
result = s3_client.delete_object(
|
||||
Bucket=S3_BUCKET,
|
||||
Key=f"{get_secret('S3_PREFIX')}/{user_id}"
|
||||
)
|
||||
return result, 200
|
||||
conn = init_db_connection()
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute("""
|
||||
SELECT user_id FROM ailbl_user_email WHERE email = %s;
|
||||
""", (email,))
|
||||
existing = cursor.fetchone()
|
||||
if existing:
|
||||
return {"errorCode": "EMAIL_ALREADY_EXISTS"}, 409, CORS_HEADERS
|
||||
|
||||
id = str(uuid_lib.uuid4())
|
||||
provider = auto_detect_provider(email)
|
||||
cursor.execute("""
|
||||
INSERT INTO ailbl_user_email (id, user_id, email, provider, created, modified)
|
||||
VALUES (%s, %s, %s, %s, now(), now())
|
||||
RETURNING id;
|
||||
""", (id, user_id, email, provider))
|
||||
email_id = cursor.fetchone()[0]
|
||||
conn.commit()
|
||||
|
||||
return {
|
||||
"id": email_id,
|
||||
"user_id": user_id,
|
||||
"email": email,
|
||||
"provider": provider,
|
||||
"status": "added"
|
||||
}, 200, CORS_HEADERS
|
||||
except Exception as e:
|
||||
return {"error": str(e)}, 500
|
||||
current_app.logger.error(f"[add_email_to_user] DB Error: {e}")
|
||||
return {"error": "DATABASE_ERROR"}, 500, CORS_HEADERS
|
||||
finally:
|
||||
if cursor:
|
||||
cursor.close()
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
|
||||
# Check provide lay phan duoi de xem ai la nha cung cap
|
||||
def auto_detect_provider(email: str) -> str:
|
||||
domain_part = email.split('@')[-1]
|
||||
return domain_part.split('.')[0]
|
||||
|
||||
|
||||
def get_max_mail(): # Setting config , setting user toi da duoc luu bao nhieu email
|
||||
url = f"{get_secret('KEY_MAX_EMAIL_PER_USER')}"
|
||||
try:
|
||||
max_email = requests.get(url).json()
|
||||
return int(max_email)
|
||||
except Exception as e:
|
||||
current_app.logger.error(
|
||||
f"[get_max_email_per_user] Error retrieving max email per user: {str(e)}")
|
||||
return None
|
||||
|
||||
|
||||
def cout_email(user_id): # kiem tra email cua 1 user, roi dem email cua user do roi tra ra
|
||||
conn = None
|
||||
try:
|
||||
conn = init_db_connection()
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute("""
|
||||
SELECT COUNT(*) FROM ailbl_user_email
|
||||
WHERE user_id = %s
|
||||
""", (user_id,))
|
||||
count = cursor.fetchone()[0]
|
||||
return count
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"[count_user_emails] DB Error: {str(e)}")
|
||||
return 0
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
current_app.logger.info("Closed DB connection")
|
||||
|
||||
|
||||
def get_email_by_id(email_id: str): #Admin update email, update email phai goi ra duoc
|
||||
conn = None
|
||||
cursor = None
|
||||
try:
|
||||
conn = init_db_connection()
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute("""
|
||||
SELECT email
|
||||
FROM ailbl_user_email
|
||||
WHERE id = %s;
|
||||
""", (email_id,))
|
||||
row = cursor.fetchone()
|
||||
if row:
|
||||
return row[0]
|
||||
else:
|
||||
return None
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"[get_email_by_id] DB Error: {e}")
|
||||
return {"error": "DATABASE_ERROR"}, 500
|
||||
finally:
|
||||
if cursor:
|
||||
cursor.close()
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
|
||||
def exists_email(email_id: str) -> bool:
|
||||
conn = None
|
||||
cursor = None
|
||||
try:
|
||||
conn = init_db_connection()
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute("""
|
||||
SELECT 1
|
||||
FROM ailbl_user_email
|
||||
WHERE id = %s;
|
||||
""", (email_id,))
|
||||
row = cursor.fetchone()
|
||||
return row is not None
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"[exists_email] DB Error: {e}")
|
||||
return False
|
||||
finally:
|
||||
if cursor:
|
||||
cursor.close()
|
||||
if conn:
|
||||
conn.close()
|
||||
Reference in New Issue
Block a user