import datetime import logging import socket import psycopg2 from flask import current_app from psycopg2.extras import LoggingConnection CORS_HEADERS = { "Content-Type": "application/json", } SECRET_NAME = "fission-ai-work-env" CONFIG_NAME = "fission-ai-work-config" K8S_NAMESPACE = "default" logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) def init_db_connection(): r"""init_db_connection() -> psycopg2.extensions.connection Initialize and return a PostgreSQL database connection. Reads connection parameters from Kubernetes secrets and establishes a connection to the PostgreSQL database with logging enabled. Returns: psycopg2.extensions.connection: Active database connection with :class:`LoggingConnection` factory for query logging. Raises: Exception: If database host/port is unreachable. Note: Connection parameters are read from K8s secrets: - ``PG_HOST``: Database host (default: ``127.0.0.1``) - ``PG_PORT``: Database port (default: ``5432``) - ``PG_DB``: Database name (default: ``postgres``) - ``PG_USER``: Database user (default: ``postgres``) - ``PG_PASS``: Database password (default: ``secret``) .. warning:: Caller is responsible for closing the connection after use. """ db_host = get_secret("PG_HOST", "127.0.0.1") db_port = int(get_secret("PG_PORT", 5432)) if not check_port_open(ip=db_host, port=db_port): raise Exception(f"Establishing A Database Connection. `{db_host}:{db_port}`") # options = get_secret("PG_DBSCHEMA") # if options: # options = f"-c search_path={options}" # if specific db schema conn = psycopg2.connect( database=get_secret("PG_DB", "postgres"), user=get_secret("PG_USER", "postgres"), password=get_secret("PG_PASS", "secret"), host=get_secret("PG_HOST", "127.0.0.1"), port=int(get_secret("PG_PORT", 5432)), # options=options, # cursor_factory=NamedTupleCursor, connection_factory=LoggingConnection, ) conn.initialize(logger) return conn def db_row_to_dict(cursor, row): r"""db_row_to_dict(cursor, row) -> dict Convert a database row tuple to a dictionary. Uses cursor description to map column names to values. Automatically converts :class:`datetime.datetime` values to ISO format strings. Args: cursor: Database cursor with ``description`` attribute containing column metadata. row (tuple): Row tuple from ``cursor.fetchone()`` or similar. Returns: dict: Dictionary with column names as keys and row values. Example:: >>> cursor.execute("SELECT id, name, created FROM users") >>> row = cursor.fetchone() >>> db_row_to_dict(cursor, row) {'id': '123', 'name': 'John', 'created': '2024-01-01T10:00:00'} """ record = {} for i, column in enumerate(cursor.description): data = row[i] if isinstance(data, datetime.datetime): data = data.isoformat() record[column.name] = data return record def db_rows_to_array(cursor, rows): r"""db_rows_to_array(cursor, rows) -> list[dict] Convert multiple database rows to a list of dictionaries. Args: cursor: Database cursor with ``description`` attribute. rows (list[tuple]): List of row tuples from ``cursor.fetchall()``. Returns: list[dict]: List of dictionaries, one per row. See Also: :func:`db_row_to_dict` for single row conversion. """ return [db_row_to_dict(cursor, row) for row in rows] def get_current_namespace() -> str: r"""get_current_namespace() -> str Get the current Kubernetes namespace. Reads namespace from the K8s service account file. Falls back to ``default`` namespace if file is not accessible. Returns: str: Current K8s namespace name. """ try: with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f: namespace = f.read() except Exception as err: current_app.logger.error(err) namespace = K8S_NAMESPACE return str(namespace) def get_secret(key: str, default=None): r"""get_secret(key, default=None) -> str | None Read a secret value from Kubernetes mounted volume. Args: key (str): Secret key name (e.g., ``"PG_HOST"``). default: Value to return if secret is not found. Default: ``None`` Returns: str | None: Secret value or default if not found. Note: Secrets are mounted at ``/secrets/{namespace}/fission-ai-work-env/{key}`` """ namespace = get_current_namespace() path = f"/secrets/{namespace}/{SECRET_NAME}/{key}" try: with open(path, "r") as f: return f.read() except Exception as err: current_app.logger.error(path, err) return default def get_config(key: str, default=None): r"""get_config(key, default=None) -> str | None Read a config value from Kubernetes ConfigMap mounted volume. Args: key (str): Config key name. default: Value to return if config is not found. Default: ``None`` Returns: str | None: Config value or default if not found. Note: ConfigMaps are mounted at ``/configs/{namespace}/fission-ai-work-config/{key}`` """ namespace = get_current_namespace() path = f"/configs/{namespace}/{CONFIG_NAME}/{key}" try: with open(path, "r") as f: return f.read() except Exception as err: current_app.logger.error(path, err) return default def str_to_bool(input: str | None) -> bool: r"""str_to_bool(input) -> bool | None Convert a string to boolean value. Args: input (str | None): String to convert. Case-insensitive. Returns: bool | None: ``True`` for ``"true"``, ``False`` for ``"false"``, ``None`` for any other value. Example:: >>> str_to_bool("true") True >>> str_to_bool("FALSE") False >>> str_to_bool("yes") None """ input = input or "" # Dictionary to map string values to boolean BOOL_MAP = {"true": True, "false": False} return BOOL_MAP.get(input.strip().lower(), None) def check_port_open(ip: str, port: int, timeout: int = 30): r"""check_port_open(ip, port, timeout=30) -> bool Check if a TCP port is open and accepting connections. Args: ip (str): IP address or hostname to check. port (int): Port number to check. timeout (int): Connection timeout in seconds. Default: ``30`` Returns: bool: ``True`` if port is open, ``False`` otherwise. Example:: >>> check_port_open("127.0.0.1", 5432) True >>> check_port_open("localhost", 9999, timeout=5) False """ try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(timeout) result = s.connect_ex((ip, port)) return result == 0 except Exception as err: current_app.logger.err(f"Check port open error: {err}") return False