import json import urllib.parse import uuid from typing import List from flask import current_app import requests from helpers import CORS_HEADERS, get_secret, init_db_connection, kratos import uuid as uuid_lib def get_user_email_from_kratos(user_id: str): # Get email from Kratos try: current_app.logger.info( f"[get_user_email_from_kratos] Checking Kratos connection for user_id={user_id}") identity = kratos.get_identity(id=user_id) current_app.logger.info( f"[get_user_email_from_kratos] Kratos response: {identity}") return identity.traits.get("email") except Exception as e: current_app.logger.error( f"[get_user_email_from_kratos] Error connecting to Kratos: {e}") return None def get_emails_by_user(user_id: str): conn = None try: conn = init_db_connection() with conn.cursor() as cursor: cursor.execute(""" SELECT id, user_id, email, provider, created, modified FROM ailbl_user_email WHERE user_id = %s """, (user_id,)) rows = cursor.fetchall() emails = [] for row in rows: emails.append({ "id": row[0], "user_id": row[1], "email": row[2], "provider": row[3], "created": row[4].isoformat(), "modified": row[5].isoformat() }) return emails, 200, CORS_HEADERS except Exception as e: current_app.logger.error(f"[get_emails_by_user] DB Error: {str(e)}") return {"error": "DATABASE_ERROR"}, 500, CORS_HEADERS finally: if conn: conn.close() # def create_or_update_config_account(email: str, key: str, type: str = "create"): # url_get_config = f"{get_secret('API_CONFIG_USER')}/{get_secret('KEY_CONFIG_USER')}" # if type == "update": # url_get_config = f"{get_secret('API_CONFIG_USER')}/{get_secret('PREFIX_KEY_CONFIG_USER')}_{key}" # headers = {} # payload = {} # response = requests.request( # "GET", url_get_config, headers=headers, data=payload) # if response.status_code != 200: # url_get_config = ( # f"{get_secret('API_CONFIG_USER')}/{get_secret('KEY_CONFIG_USER')}" # ) # response = requests.request( # "GET", url_get_config, headers=headers, data=payload # ) # if response.status_code != 200: # return {"error": "Failed to retrieve configuration"}, response.status_code # config_user = response.json() # if email != "": # config_user["profile_setting"]["primary"]["email"] = email # config_user["user_id"] = key # json_str = json.dumps(config_user) # Convert dict to JSON string # encoded_json = urllib.parse.quote(json_str) # URL encode # if ( # type == "update" # and url_get_config # != f"{get_secret('API_CONFIG_USER')}/{get_secret('KEY_CONFIG_USER')}" # ): # url_save = f"{get_secret('API_CONFIG_USER')}/{get_secret('PREFIX_KEY_CONFIG_USER')}_{key}" # payload = ( # f"content={encoded_json}" # f"&mime_type=application/json" # f"&public=true" # f"&active=true" # f"&refer=GEOHUB" # ) # method = "PUT" # else: # url_save = f"{get_secret('API_CONFIG_USER')}" # payload = f"content={encoded_json}&key={get_secret('PREFIX_KEY_CONFIG_USER')}_{key}&mime_type=application/json&public=true&active=true" # method = "POST" # headers = {"Content-Type": "application/x-www-form-urlencoded"} # response_save = requests.request( # method, url_save, headers=headers, data=payload) # return response_save.json(), response_save.status_code # def get_config_account(key: str): # url_get_config = ( # f"{get_secret('API_CONFIG_USER')}/{get_secret('PREFIX_KEY_CONFIG_USER')}_{key}" # ) # headers = {} # payload = {} # response = requests.request( # "GET", url_get_config, headers=headers, data=payload) # if response.status_code != 200: # email = get_user_email_from_kratos(key) # if not email: # return {"error": "User email not found"}, 404 # res, status = create_or_update_config_account( # email=email, key=key, type="create" # ) # if status != 200: # return res, status # email_new, email_status, headers = add_email_to_user( # user_id=key, email=email) # if email_status != 200: # return email_new, email_status # response = requests.request( # "GET", url_get_config, headers=headers, data=payload # ) # return response.json(), response.status_code # def create_config_account_if_not_exists(user_id: str): # url_get_config = ( # f"{get_secret('API_CONFIG_USER')}/{get_secret('PREFIX_KEY_CONFIG_USER')}_{user_id}" # ) # headers = {} # payload = {} # response = requests.request( # "GET", url_get_config, headers=headers, data=payload) # if response.status_code != 200: # email = get_user_email_from_kratos(user_id) # if not email: # return {"error": "User email not found"}, 404 # res, status = create_or_update_config_account( # email=email, key=user_id, type="create" # ) # if status != 200: # return res, status # email_new, email_status, headers = add_email_to_user( # user_id=user_id, email=email) # if email_status != 200: # return email_new, email_status # return {"message": "Config account exists or created successfully"}, 200 def add_email_to_user(user_id: str, email: str): # Add email conn = None cursor = None try: conn = init_db_connection() with conn.cursor() as cursor: cursor.execute(""" SELECT user_id FROM ailbl_user_email WHERE email = %s; """, (email,)) # Truy van ra ket qua email existing = cursor.fetchone() # Tao ra bien existing save email duoc add if existing: return {"errorCode": "EMAIL_ALREADY_EXISTS"}, 409, CORS_HEADERS id = str(uuid_lib.uuid4()) provider = auto_detect_provider(email) # Ham xu li lay phan sau cua @ cursor.execute(""" INSERT INTO ailbl_user_email (id, user_id, email, provider, created, modified) VALUES (%s, %s, %s, %s, now(), now()) RETURNING id; """, (id, user_id, email, provider)) email_id = cursor.fetchone()[0] conn.commit() return { "id": email_id, "user_id": user_id, "email": email, "provider": provider, "status": "added" }, 200, CORS_HEADERS except Exception as e: current_app.logger.error(f"[add_email_to_user] DB Error: {e}") return {"error": "DATABASE_ERROR"}, 500, CORS_HEADERS finally: if cursor: cursor.close() if conn: conn.close() # Check provide lay phan duoi de xem ai la nha cung cap def auto_detect_provider(email: str) -> str: domain_part = email.split('@')[-1] return domain_part.split('.')[0] def get_max_mail(): # Setting config , setting user toi da duoc luu bao nhieu email url = f"{get_secret('KEY_MAX_EMAIL_PER_USER')}" try: max_email = requests.get(url).json() return int(max_email) except Exception as e: current_app.logger.error( f"[get_max_email_per_user] Error retrieving max email per user: {str(e)}") return None def cout_email(user_id): # kiem tra email cua 1 user, roi dem email cua user do roi tra ra conn = None try: conn = init_db_connection() with conn.cursor() as cursor: cursor.execute(""" SELECT COUNT(*) FROM ailbl_user_email WHERE user_id = %s """, (user_id,)) count = cursor.fetchone()[0] return count except Exception as e: current_app.logger.error(f"[count_user_emails] DB Error: {str(e)}") return 0 finally: if conn: conn.close() current_app.logger.info("Closed DB connection") def get_email_by_id(email_id: str): #Admin update email, update email phai goi ra duoc conn = None cursor = None try: conn = init_db_connection() with conn.cursor() as cursor: cursor.execute(""" SELECT email FROM ailbl_user_email WHERE id = %s; """, (email_id,)) row = cursor.fetchone() if row: return row[0] else: return None except Exception as e: current_app.logger.error(f"[get_email_by_id] DB Error: {e}") return {"error": "DATABASE_ERROR"}, 500 finally: if cursor: cursor.close() if conn: conn.close() def exists_email(email_id: str) -> bool: conn = None cursor = None try: conn = init_db_connection() with conn.cursor() as cursor: cursor.execute(""" SELECT 1 FROM ailbl_user_email WHERE id = %s; """, (email_id,)) row = cursor.fetchone() return row is not None except Exception as e: current_app.logger.error(f"[exists_email] DB Error: {e}") return False finally: if cursor: cursor.close() if conn: conn.close()