from typing import Optional import crud from email_validator import EmailNotValidError, validate_email from flask import jsonify, request from pydantic import BaseModel, Field, ValidationError from sqlalchemy.orm import Session from helpers import kratos, init_db_connection, CORS_HEADERS def main(): """ ```fission { "name": "email-admin-delete", "http_triggers": { "email-admin-delete-http": { "url": "/ailbl/admin/users/{UserId}/emails/{UserEmailId}", "methods": ["DELETE"] } } } ``` """ try: if request.method == "DELETE": return delete_email() else: return {"error": "Method not allow"}, 405 except Exception as ex: return jsonify({"error": str(ex)}), 500 def delete_email(): user_id = request.headers.get("X-Fission-Params-UserId") email_id = request.headers.get("X-Fission-Params-UserEmailId") if not user_id: return jsonify({"errorCode": "USER_ID_REQUIRED"}), 400, CORS_HEADERS if not email_id: return jsonify({"errorCode": "USER_EMAIL_ID_REQUIRED"}), 400, CORS_HEADERS # check if email exists if exists_email(email_id) is False: return jsonify({"errorCode": "EMAIL_NOT_FOUND"}), 404, CORS_HEADERS # check if email is primary = email chinh # config , status = get_config_account(user_id) # if status == 200: # email_primary = config["profile_setting"]["primary"]["email"] # if check_is_primary(email_primary, email_id): # return jsonify({"errorCode": "CANNOT_DELETE_PRIMARY_EMAIL"}), 400, CORS_HEADERS # Proceed to delete the email conn = None cursor = None try: conn = init_db_connection() cursor = conn.cursor() cursor.execute( """ DELETE FROM ailbl_user_email WHERE id = %s AND user_id = %s RETURNING id; """, (email_id, user_id) ) result = cursor.fetchone() if not result: return jsonify({"errorCode": "EMAIL_NOT_FOUND"}), 404, CORS_HEADERS conn.commit() return jsonify({ "id": email_id, "status": "deleted" }), 200, CORS_HEADERS except Exception as e: if conn: conn.rollback() # current_app.logger.error(f"[delete_email] Database error: {str(e)}") return jsonify({"errorCode": "DATABASE_ERROR"}), 500, CORS_HEADERS finally: if cursor: cursor.close() if conn: conn.close() def exists_email(email_id: str) -> bool: conn = None cursor = None try: conn = init_db_connection() with conn.cursor() as cursor: cursor.execute(""" SELECT 1 FROM ailbl_user_email WHERE id = %s; """, (email_id,)) row = cursor.fetchone() return row is not None except Exception as e: # current_app.logger.error(f"[exists_email] DB Error: {e}") return False finally: if cursor: cursor.close() if conn: conn.close() # update email : set as primary # def update_email(): # user_id = request.headers.get("X-Fission-Params-UserId") # email_id = request.headers.get("X-Fission-Params-UserEmailId") # request_data = request.get_json() # is_primary = request_data.get("is_primary") # if not user_id: # return jsonify({"errorCode": "USER_ID_REQUIRED"}), 400, CORS_HEADERS # if not email_id: # return jsonify({"errorCode": "USER_EMAIL_ID_REQUIRED"}), 400, CORS_HEADERS # if not is_primary: # return jsonify({"errorCode": "NO_UPDATES_PROVIDED"}), 400, CORS_HEADERS # # ensure config account exists # # create_config_account_if_not_exists(user_id) # # check if email exists # email = get_email_by_id(email_id=email_id) # if not email: # return jsonify({"errorCode": "EMAIL_NOT_FOUND"}), 404, CORS_HEADERS # # set as primary in config account # if is_primary: # identity = kratos.get_identity(user_id) # traits = identity.traits # traits["email"] = email # res = kratos.update_identity( # id=user_id, # update_identity_body={ # "schema_id": identity.schema_id, # "traits": traits, # "state": identity.state, # }, # ) # # update config email # # r, status = create_or_update_config_account(email=email, key=user_id, type="update") # # if status != 200: # # return jsonify({"errorCode": "FAILED_TO_UPDATE_PRIMARY_EMAIL"}), status # return jsonify({ # "id": email_id, # "user_id": user_id, # "email": email, # "status": "set_as_primary" # }), 200, CORS_HEADERS # def get_email_by_id(email_id: str): # GET EMAIL ra id moi update duoc # conn = None # cursor = None # try: # conn = init_db_connection() # with conn.cursor() as cursor: # cursor.execute(""" # SELECT email # FROM ailbl_user_email # WHERE id = %s; # """, (email_id,)) # row = cursor.fetchone() # if row: # return row[0] # else: # return None # except Exception as e: # # current_app.logger.error(f"[get_email_by_id] DB Error: {e}") # return {"error": "DATABASE_ERROR"}, 500 # finally: # if cursor: # cursor.close() # if conn: # conn.close()