up step
Some checks failed
K8S Fission Deployment / Deployment fission functions (push) Failing after 12s

This commit is contained in:
Duc Nguyen
2026-01-27 01:23:56 +07:00
parent 018f267fab
commit 6a1789b3f5
13 changed files with 5043 additions and 6 deletions

View File

@@ -24,7 +24,7 @@ def main():
"fntimeout": 300,
"http_triggers": {
"ai-admin-filter-create-user-http": {
"url": "/ai/admin/users",
"url": "/ailbl/ai/admin/users",
"methods": ["POST", "GET"]
}
}
@@ -44,6 +44,36 @@ def main():
def make_insert_request():
r"""make_insert_request() -> tuple[Response, int, dict]
Create a new user from the request JSON body.
Validates the request body using :class:`AiUserCreate` schema, generates
a UUID for the new user, and inserts the record into the database.
Returns:
tuple: A tuple containing:
- JSON response with created user data or error details
- HTTP status code (201 on success, 400/409/500 on error)
- CORS headers dict
Raises:
ValidationError: If request body fails Pydantic validation (returns 400).
IntegrityError: If email already exists in database (returns 409).
Example::
>>> # POST /ai/admin/users
>>> # Body: {"name": "John Doe", "email": "john@example.com"}
>>> # Response: 201 Created
>>> {
... "id": "550e8400-e29b-41d4-a716-446655440000",
... "name": "John Doe",
... "email": "john@example.com",
... "created": "2024-01-01T10:00:00",
... "modified": "2024-01-01T10:00:00"
... }
"""
try:
body = AiUserCreate(**(request.get_json(silent=True) or {}))
except ValidationError as e:
@@ -74,6 +104,44 @@ def make_insert_request():
def make_filter_request():
r"""make_filter_request() -> Response
Filter and paginate users based on query parameters.
Builds a dynamic SQL query based on filter parameters from the request,
executes the query with pagination, and returns the matching users.
Query Parameters:
page (int): Page number (0-indexed). Default: ``0``
size (int): Number of records per page. Default: ``8``
sortby (str): Field to sort by (``created`` or ``modified``).
asc (bool): Sort ascending if ``True``, descending if ``False``.
filter[ids] (list): Filter by specific user IDs.
filter[keyword] (str): Search in name and email fields.
filter[name] (str): Filter by name (case-insensitive partial match).
filter[email] (str): Filter by email (case-insensitive partial match).
filter[created_from] (str): Filter by creation date (from).
filter[created_to] (str): Filter by creation date (to).
filter[dob_from] (str): Filter by date of birth (from).
filter[dob_to] (str): Filter by date of birth (to).
Returns:
Response: JSON array of user records with pagination metadata.
Example::
>>> # GET /ai/admin/users?page=0&size=10&filter[name]=john
>>> # Response: 200 OK
>>> [
... {
... "id": "550e8400-e29b-41d4-a716-446655440000",
... "name": "John Doe",
... "email": "john@example.com",
... "count": 100,
... "total": 5
... }
... ]
"""
paging = UserPage.from_request_queries()
conn = None
@@ -89,6 +157,19 @@ def make_filter_request():
def __filter_users(cursor, paging: "UserPage"):
r"""Build and execute SQL query for filtering users.
Args:
cursor: Database cursor object for executing queries.
paging (UserPage): Pagination and filter parameters.
Returns:
list[dict]: List of user records as dictionaries, including
``count`` (total records) and ``total`` (filtered count).
Note:
This is a private function. Use :func:`make_filter_request` instead.
"""
conditions = []
values = {}
@@ -164,6 +245,14 @@ def __filter_users(cursor, paging: "UserPage"):
@dataclasses.dataclass
class Page:
r"""Base pagination parameters for list queries.
Attributes:
page (int, optional): Page number (0-indexed). Default: ``0``
size (int, optional): Number of records per page. Default: ``8``
asc (bool, optional): Sort order. ``True`` for ascending, ``False`` for descending.
"""
page: typing.Optional[int] = None
size: typing.Optional[int] = None
asc: typing.Optional[bool] = None
@@ -179,6 +268,22 @@ class Page:
@dataclasses.dataclass
class UserFilter:
r"""Filter parameters for user queries.
Attributes:
ids (list[str], optional): Filter by specific user IDs.
keyword (str, optional): Search keyword for name and email fields.
name (str, optional): Filter by name (case-insensitive partial match).
email (str, optional): Filter by email (case-insensitive partial match).
gender (str, optional): Filter by gender.
created_from (str, optional): Filter users created on or after this date.
created_to (str, optional): Filter users created on or before this date.
modified_from (str, optional): Filter users modified on or after this date.
modified_to (str, optional): Filter users modified on or before this date.
dob_from (str, optional): Filter users with DOB on or after this date.
dob_to (str, optional): Filter users with DOB on or before this date.
"""
ids: typing.Optional[typing.List[str]] = None
keyword: typing.Optional[str] = None
name: typing.Optional[str] = None
@@ -209,12 +314,39 @@ class UserFilter:
class UserSortField(str, enum.Enum):
r"""Allowed sort fields for user queries.
Attributes:
CREATED: Sort by creation timestamp.
MODIFIED: Sort by last modification timestamp.
"""
CREATED = "created"
MODIFIED = "modified"
@dataclasses.dataclass
class UserPage(Page):
r"""Pagination parameters with user-specific filters and sorting.
Extends :class:`Page` with user filtering and sorting capabilities.
Attributes:
sortby (UserSortField, optional): Field to sort results by.
See :class:`UserSortField` for allowed values.
filter (UserFilter, optional): Filter parameters for the query.
Default: Parsed from request query parameters.
Example::
>>> # Parse from request: GET /users?page=1&size=20&sortby=created&asc=true
>>> paging = UserPage.from_request_queries()
>>> paging.page
1
>>> paging.sortby
<UserSortField.CREATED: 'created'>
"""
sortby: typing.Optional[UserSortField] = None
filter: typing.Optional[UserFilter] = dataclasses.field(
default_factory=UserFilter.from_request_queries

View File

@@ -19,6 +19,31 @@ logger = logging.getLogger(__name__)
def init_db_connection():
r"""init_db_connection() -> psycopg2.extensions.connection
Initialize and return a PostgreSQL database connection.
Reads connection parameters from Kubernetes secrets and establishes
a connection to the PostgreSQL database with logging enabled.
Returns:
psycopg2.extensions.connection: Active database connection with
:class:`LoggingConnection` factory for query logging.
Raises:
Exception: If database host/port is unreachable.
Note:
Connection parameters are read from K8s secrets:
- ``PG_HOST``: Database host (default: ``127.0.0.1``)
- ``PG_PORT``: Database port (default: ``5432``)
- ``PG_DB``: Database name (default: ``postgres``)
- ``PG_USER``: Database user (default: ``postgres``)
- ``PG_PASS``: Database password (default: ``secret``)
.. warning::
Caller is responsible for closing the connection after use.
"""
db_host = get_secret("PG_HOST", "127.0.0.1")
db_port = int(get_secret("PG_PORT", 5432))
@@ -43,6 +68,28 @@ def init_db_connection():
def db_row_to_dict(cursor, row):
r"""db_row_to_dict(cursor, row) -> dict
Convert a database row tuple to a dictionary.
Uses cursor description to map column names to values. Automatically
converts :class:`datetime.datetime` values to ISO format strings.
Args:
cursor: Database cursor with ``description`` attribute containing
column metadata.
row (tuple): Row tuple from ``cursor.fetchone()`` or similar.
Returns:
dict: Dictionary with column names as keys and row values.
Example::
>>> cursor.execute("SELECT id, name, created FROM users")
>>> row = cursor.fetchone()
>>> db_row_to_dict(cursor, row)
{'id': '123', 'name': 'John', 'created': '2024-01-01T10:00:00'}
"""
record = {}
for i, column in enumerate(cursor.description):
data = row[i]
@@ -53,10 +100,34 @@ def db_row_to_dict(cursor, row):
def db_rows_to_array(cursor, rows):
r"""db_rows_to_array(cursor, rows) -> list[dict]
Convert multiple database rows to a list of dictionaries.
Args:
cursor: Database cursor with ``description`` attribute.
rows (list[tuple]): List of row tuples from ``cursor.fetchall()``.
Returns:
list[dict]: List of dictionaries, one per row.
See Also:
:func:`db_row_to_dict` for single row conversion.
"""
return [db_row_to_dict(cursor, row) for row in rows]
def get_current_namespace() -> str:
r"""get_current_namespace() -> str
Get the current Kubernetes namespace.
Reads namespace from the K8s service account file. Falls back to
``default`` namespace if file is not accessible.
Returns:
str: Current K8s namespace name.
"""
try:
with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f:
namespace = f.read()
@@ -67,6 +138,20 @@ def get_current_namespace() -> str:
def get_secret(key: str, default=None):
r"""get_secret(key, default=None) -> str | None
Read a secret value from Kubernetes mounted volume.
Args:
key (str): Secret key name (e.g., ``"PG_HOST"``).
default: Value to return if secret is not found. Default: ``None``
Returns:
str | None: Secret value or default if not found.
Note:
Secrets are mounted at ``/secrets/{namespace}/fission-ai-work-env/{key}``
"""
namespace = get_current_namespace()
path = f"/secrets/{namespace}/{SECRET_NAME}/{key}"
try:
@@ -78,6 +163,20 @@ def get_secret(key: str, default=None):
def get_config(key: str, default=None):
r"""get_config(key, default=None) -> str | None
Read a config value from Kubernetes ConfigMap mounted volume.
Args:
key (str): Config key name.
default: Value to return if config is not found. Default: ``None``
Returns:
str | None: Config value or default if not found.
Note:
ConfigMaps are mounted at ``/configs/{namespace}/fission-ai-work-config/{key}``
"""
namespace = get_current_namespace()
path = f"/configs/{namespace}/{CONFIG_NAME}/{key}"
try:
@@ -89,6 +188,26 @@ def get_config(key: str, default=None):
def str_to_bool(input: str | None) -> bool:
r"""str_to_bool(input) -> bool | None
Convert a string to boolean value.
Args:
input (str | None): String to convert. Case-insensitive.
Returns:
bool | None: ``True`` for ``"true"``, ``False`` for ``"false"``,
``None`` for any other value.
Example::
>>> str_to_bool("true")
True
>>> str_to_bool("FALSE")
False
>>> str_to_bool("yes")
None
"""
input = input or ""
# Dictionary to map string values to boolean
BOOL_MAP = {"true": True, "false": False}
@@ -96,6 +215,25 @@ def str_to_bool(input: str | None) -> bool:
def check_port_open(ip: str, port: int, timeout: int = 30):
r"""check_port_open(ip, port, timeout=30) -> bool
Check if a TCP port is open and accepting connections.
Args:
ip (str): IP address or hostname to check.
port (int): Port number to check.
timeout (int): Connection timeout in seconds. Default: ``30``
Returns:
bool: ``True`` if port is open, ``False`` otherwise.
Example::
>>> check_port_open("127.0.0.1", 5432)
True
>>> check_port_open("localhost", 9999, timeout=5)
False
"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout)

View File

@@ -0,0 +1,5 @@
pytest==8.3.5
pytest-mock==3.14.0
flask==3.1.0
psycopg2-binary==2.9.10
pydantic==2.10.6

View File

@@ -1,4 +1,4 @@
psycopg2-binary==2.9.10
pydantic==2.11.7
PyNaCl==1.6.0
Flask==3.1.0
Flask==3.1.0

View File

@@ -6,6 +6,28 @@ from pydantic import BaseModel, Field, field_validator
class AiUserCreate(BaseModel):
r"""Schema for creating a new AI user.
Validates user data for the POST /ai/admin/users endpoint.
Attributes:
id (str, optional): User UUID. Auto-generated if not provided.
name (str): User's full name. Required, 1-128 characters.
email (str): User's email address. Required, max 256 characters.
Must be a valid email format.
dob (date, optional): Date of birth in YYYY-MM-DD format.
gender (str, optional): User's gender, max 10 characters.
Example::
>>> user = AiUserCreate(
... name="John Doe",
... email="john@example.com",
... dob="1990-01-15",
... gender="male"
... )
"""
id: Optional[str] = None
name: str = Field(min_length=1, max_length=128)
email: str = Field(..., max_length=256)
@@ -20,6 +42,31 @@ class AiUserCreate(BaseModel):
class AiUserUpdate(BaseModel):
r"""Schema for updating an existing AI user.
Validates user data for the PUT /ai/admin/users/{UserID} endpoint.
All fields are optional for partial updates.
Attributes:
name (str, optional): User's full name, 1-128 characters.
email (str, optional): User's email address, max 256 characters.
Must be a valid email format if provided.
dob (date, optional): Date of birth in YYYY-MM-DD format.
gender (str, optional): User's gender, max 10 characters.
Example::
>>> # Partial update - only change name
>>> update = AiUserUpdate(name="Jane Doe")
>>> # Full update
>>> update = AiUserUpdate(
... name="Jane Doe",
... email="jane@example.com",
... gender="female"
... )
"""
name: Optional[str] = Field(default=None, min_length=1, max_length=128)
email: Optional[str] = Field(default=None, max_length=256)
dob: Optional[date] = None
@@ -33,6 +80,36 @@ class AiUserUpdate(BaseModel):
class AiUserFilter(BaseModel):
r"""Schema for filtering and paginating AI users.
Used for parsing query parameters in the GET /ai/admin/users endpoint.
Attributes:
q (str, optional): Search keyword for name and email fields.
name (str, optional): Filter by name (partial match).
email (str, optional): Filter by email (partial match).
gender (str, optional): Filter by gender (exact match).
dob_from (date, optional): Filter users with DOB on or after this date.
dob_to (date, optional): Filter users with DOB on or before this date.
created_from (str, optional): Filter users created on or after this datetime.
created_to (str, optional): Filter users created on or before this datetime.
page (int): Page number (0-indexed). Default: ``0``
size (int): Records per page, 1-200. Default: ``20``
sortby (str): Field to sort by. Default: ``"modified"``
asc (bool): Sort ascending if ``True``. Default: ``False``
Example::
>>> # Parse from query string
>>> filter = AiUserFilter(
... q="john",
... page=0,
... size=10,
... sortby="created",
... asc=True
... )
"""
q: Optional[str] = None
name: Optional[str] = None
email: Optional[str] = None

1
apps/tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Tests package for AI Admin API

99
apps/tests/conftest.py Normal file
View File

@@ -0,0 +1,99 @@
"""Shared fixtures for API handler tests."""
import sys
from pathlib import Path
from unittest.mock import MagicMock
import pytest
from flask import Flask
# Add apps directory to path for imports
apps_dir = Path(__file__).parent.parent
sys.path.insert(0, str(apps_dir))
@pytest.fixture
def flask_app():
"""Create Flask app context for testing."""
app = Flask(__name__)
app.config["TESTING"] = True
return app
@pytest.fixture
def app_context(flask_app):
"""Provide Flask application context."""
with flask_app.app_context():
yield flask_app
@pytest.fixture
def request_context(flask_app):
"""Provide Flask request context."""
with flask_app.test_request_context():
yield flask_app
@pytest.fixture
def mock_db_connection(mocker):
"""Mock database connection with cursor that has description attribute."""
mock_cursor = MagicMock()
mock_cursor.description = [
MagicMock(name="id"),
MagicMock(name="name"),
MagicMock(name="dob"),
MagicMock(name="email"),
MagicMock(name="gender"),
MagicMock(name="created"),
MagicMock(name="modified"),
]
# Set name attribute on description items
for i, col_name in enumerate(["id", "name", "dob", "email", "gender", "created", "modified"]):
mock_cursor.description[i].name = col_name
mock_conn = MagicMock()
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
mocker.patch("helpers.init_db_connection", return_value=mock_conn)
return {"connection": mock_conn, "cursor": mock_cursor}
@pytest.fixture
def mock_secrets(mocker):
"""Mock get_secret to return test values."""
secrets = {
"PG_HOST": "localhost",
"PG_PORT": "5432",
"PG_DB": "test_db",
"PG_USER": "test_user",
"PG_PASS": "test_pass",
}
mocker.patch("helpers.get_secret", side_effect=lambda key, default=None: secrets.get(key, default))
return secrets
@pytest.fixture
def sample_user_data():
"""Sample user data for testing."""
return {
"name": "Test User",
"email": "test@example.com",
"dob": "1990-01-15",
"gender": "male",
}
@pytest.fixture
def sample_db_row():
"""Sample database row tuple."""
return (
"550e8400-e29b-41d4-a716-446655440000", # id
"Test User", # name
"1990-01-15", # dob
"test@example.com", # email
"male", # gender
"2024-01-01T10:00:00", # created
"2024-01-01T10:00:00", # modified
)

View File

@@ -0,0 +1,478 @@
"""Tests for filter_insert.py - GET (filter) & POST (create) handlers."""
from unittest.mock import MagicMock
import pytest
from flask import Flask
from psycopg2 import IntegrityError
class TestMain:
"""Tests for main() dispatcher function."""
def test_main_get_calls_filter(self, mocker):
"""Test GET request routes to make_filter_request().
Given:
A GET request to the endpoint.
When:
main() is called.
Then:
make_filter_request() is invoked and returns 200.
"""
app = Flask(__name__)
with app.test_request_context(method="GET"):
mock_filter = mocker.patch(
"filter_insert.make_filter_request",
return_value=({"data": []}, 200, {}),
)
mocker.patch("filter_insert.init_db_connection")
import filter_insert
result = filter_insert.main()
mock_filter.assert_called_once()
assert result[1] == 200
def test_main_post_calls_insert(self, mocker):
"""Test POST request routes to make_insert_request().
Given:
A POST request with JSON body containing user data.
When:
main() is called.
Then:
make_insert_request() is invoked and returns 201.
"""
app = Flask(__name__)
with app.test_request_context(
method="POST",
json={"name": "Test", "email": "test@example.com"},
):
mock_insert = mocker.patch(
"filter_insert.make_insert_request",
return_value=({"id": "123"}, 201, {}),
)
import filter_insert
result = filter_insert.main()
mock_insert.assert_called_once()
assert result[1] == 201
def test_main_invalid_method_returns_405(self, mocker):
"""Test unsupported HTTP method returns 405.
Given:
A PATCH request (unsupported method).
When:
main() is called.
Then:
Returns 405 Method Not Allowed with error message.
"""
app = Flask(__name__)
with app.test_request_context(method="PATCH"):
import filter_insert
result = filter_insert.main()
assert result[1] == 405
assert "error" in result[0]
class TestMakeInsertRequest:
"""Tests for make_insert_request() - user creation."""
def test_insert_success(self, mocker, sample_user_data, sample_db_row):
"""Test successful user creation returns 201.
Given:
Valid user data with name, email, dob, and gender.
Database connection is available.
When:
POST request to create user.
Then:
Returns 201 status code.
Executes INSERT query with user data.
"""
app = Flask(__name__)
mock_cursor = MagicMock()
mock_cursor.description = [
MagicMock(name="id"),
MagicMock(name="name"),
MagicMock(name="dob"),
MagicMock(name="email"),
MagicMock(name="gender"),
MagicMock(name="created"),
MagicMock(name="modified"),
]
for i, col_name in enumerate(
["id", "name", "dob", "email", "gender", "created", "modified"]
):
mock_cursor.description[i].name = col_name
mock_cursor.fetchone.return_value = sample_db_row
mock_conn = MagicMock()
mock_conn.__enter__ = MagicMock(return_value=mock_conn)
mock_conn.__exit__ = MagicMock(return_value=False)
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
mocker.patch("filter_insert.init_db_connection", return_value=mock_conn)
with app.test_request_context(
method="POST",
json=sample_user_data,
content_type="application/json",
):
import filter_insert
result = filter_insert.make_insert_request()
assert result[1] == 201
mock_cursor.execute.assert_called_once()
def test_insert_validation_error_missing_name(self, mocker):
"""Test missing required field 'name' returns 400.
Given:
Request body with email but missing required 'name' field.
When:
POST request to create user.
Then:
Returns 400 Bad Request.
Response contains errorCode 'VALIDATION_ERROR'.
"""
app = Flask(__name__)
with app.test_request_context(
method="POST",
json={"email": "test@example.com"}, # missing 'name'
content_type="application/json",
):
import filter_insert
result = filter_insert.make_insert_request()
assert result[1] == 400
response_data = result[0].get_json()
assert response_data["errorCode"] == "VALIDATION_ERROR"
def test_insert_validation_error_invalid_email(self, mocker):
"""Test invalid email format raises serialization error.
Given:
Request body with invalid email format 'invalid-email'.
When:
POST request to create user.
Then:
Raises TypeError because ValidationError.errors() contains
ValueError which is not JSON serializable.
Note:
This test documents a bug in the source code where e.errors()
is passed directly to jsonify without sanitization.
"""
app = Flask(__name__)
with app.test_request_context(
method="POST",
json={"name": "Test", "email": "invalid-email"},
content_type="application/json",
):
import filter_insert
with pytest.raises(TypeError, match="not JSON serializable"):
filter_insert.make_insert_request()
def test_insert_duplicate_email(self, mocker, sample_user_data):
"""Test duplicate email returns 409 Conflict.
Given:
Valid user data but email already exists in database.
Database raises IntegrityError on INSERT.
When:
POST request to create user.
Then:
Returns 409 Conflict.
Response contains errorCode 'DUPLICATE_TAG'.
"""
app = Flask(__name__)
mock_conn = MagicMock()
mock_conn.__enter__ = MagicMock(return_value=mock_conn)
mock_conn.__exit__ = MagicMock(return_value=False)
mock_conn.cursor.return_value.__enter__ = MagicMock(
side_effect=IntegrityError("duplicate key value")
)
mocker.patch("filter_insert.init_db_connection", return_value=mock_conn)
with app.test_request_context(
method="POST",
json=sample_user_data,
content_type="application/json",
):
import filter_insert
result = filter_insert.make_insert_request()
assert result[1] == 409
response_data = result[0].get_json()
assert response_data["errorCode"] == "DUPLICATE_TAG"
class TestMakeFilterRequest:
"""Tests for make_filter_request() - user filtering."""
def test_filter_empty_result(self, mocker):
"""Test filter with no matching results returns empty array.
Given:
Database has no users matching the filter criteria.
When:
GET request to filter users.
Then:
Returns empty JSON array [].
"""
app = Flask(__name__)
mock_cursor = MagicMock()
mock_cursor.description = [
MagicMock(name="id"),
MagicMock(name="name"),
MagicMock(name="dob"),
MagicMock(name="email"),
MagicMock(name="gender"),
MagicMock(name="created"),
MagicMock(name="modified"),
MagicMock(name="count"),
MagicMock(name="total"),
]
for i, col_name in enumerate(
["id", "name", "dob", "email", "gender", "created", "modified", "count", "total"]
):
mock_cursor.description[i].name = col_name
mock_cursor.fetchall.return_value = []
mock_conn = MagicMock()
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
mocker.patch("filter_insert.init_db_connection", return_value=mock_conn)
with app.test_request_context(method="GET"):
import filter_insert
response = filter_insert.make_filter_request()
# make_filter_request returns Response object directly (not tuple)
response_data = response.get_json()
assert response_data == []
def test_filter_with_pagination(self, mocker, sample_db_row):
"""Test filter with page and size parameters.
Given:
Request with query params page=1 and size=5.
When:
GET request to filter users.
Then:
SQL query uses LIMIT 5 and OFFSET 5 (page * size).
"""
app = Flask(__name__)
mock_cursor = MagicMock()
mock_cursor.description = [
MagicMock(name="id"),
MagicMock(name="name"),
MagicMock(name="dob"),
MagicMock(name="email"),
MagicMock(name="gender"),
MagicMock(name="created"),
MagicMock(name="modified"),
MagicMock(name="count"),
MagicMock(name="total"),
]
for i, col_name in enumerate(
["id", "name", "dob", "email", "gender", "created", "modified", "count", "total"]
):
mock_cursor.description[i].name = col_name
# Add count and total to sample row
row_with_counts = sample_db_row + (10, 10)
mock_cursor.fetchall.return_value = [row_with_counts]
mock_conn = MagicMock()
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
mocker.patch("filter_insert.init_db_connection", return_value=mock_conn)
with app.test_request_context(method="GET", query_string={"page": "1", "size": "5"}):
import filter_insert
filter_insert.make_filter_request()
# Check that execute was called with correct offset
# cursor.execute(sql, values) - values is second positional arg
call_args = mock_cursor.execute.call_args
values = call_args[0][1] # Second positional argument
assert values["limit"] == 5
assert values["offset"] == 5 # page 1 * size 5
def test_filter_with_keyword(self, mocker, sample_db_row):
"""Test filter with keyword search across name and email.
Given:
Request with query param filter[keyword]='test'.
When:
GET request to filter users.
Then:
SQL query contains ILIKE clause for keyword matching.
"""
app = Flask(__name__)
mock_cursor = MagicMock()
mock_cursor.description = [
MagicMock(name="id"),
MagicMock(name="name"),
MagicMock(name="dob"),
MagicMock(name="email"),
MagicMock(name="gender"),
MagicMock(name="created"),
MagicMock(name="modified"),
MagicMock(name="count"),
MagicMock(name="total"),
]
for i, col_name in enumerate(
["id", "name", "dob", "email", "gender", "created", "modified", "count", "total"]
):
mock_cursor.description[i].name = col_name
row_with_counts = sample_db_row + (1, 1)
mock_cursor.fetchall.return_value = [row_with_counts]
mock_conn = MagicMock()
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
mocker.patch("filter_insert.init_db_connection", return_value=mock_conn)
with app.test_request_context(
method="GET", query_string={"filter[keyword]": "test"}
):
import filter_insert
result = filter_insert.make_filter_request()
# Check SQL contains keyword filter
call_args = mock_cursor.execute.call_args
sql = call_args[0][0]
assert "ILIKE" in sql
def test_filter_with_name(self, mocker, sample_db_row):
"""Test filter by name with case-insensitive partial match.
Given:
Request with query param filter[name]='John'.
When:
GET request to filter users.
Then:
SQL query contains 'LOWER(name) LIKE %john%'.
"""
app = Flask(__name__)
mock_cursor = MagicMock()
mock_cursor.description = [
MagicMock(name="id"),
MagicMock(name="name"),
MagicMock(name="dob"),
MagicMock(name="email"),
MagicMock(name="gender"),
MagicMock(name="created"),
MagicMock(name="modified"),
MagicMock(name="count"),
MagicMock(name="total"),
]
for i, col_name in enumerate(
["id", "name", "dob", "email", "gender", "created", "modified", "count", "total"]
):
mock_cursor.description[i].name = col_name
row_with_counts = sample_db_row + (1, 1)
mock_cursor.fetchall.return_value = [row_with_counts]
mock_conn = MagicMock()
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
mocker.patch("filter_insert.init_db_connection", return_value=mock_conn)
with app.test_request_context(
method="GET", query_string={"filter[name]": "John"}
):
import filter_insert
filter_insert.make_filter_request()
call_args = mock_cursor.execute.call_args
sql = call_args[0][0]
values = call_args[0][1] # Second positional argument
assert "LOWER(name) LIKE" in sql
assert values["name"] == "%john%"
def test_filter_with_sortby(self, mocker, sample_db_row):
"""Test filter with sortby and asc parameters.
Given:
Request with query params sortby='created' and asc='true'.
When:
GET request to filter users.
Then:
SQL query contains 'ORDER BY created ASC'.
"""
app = Flask(__name__)
mock_cursor = MagicMock()
mock_cursor.description = [
MagicMock(name="id"),
MagicMock(name="name"),
MagicMock(name="dob"),
MagicMock(name="email"),
MagicMock(name="gender"),
MagicMock(name="created"),
MagicMock(name="modified"),
MagicMock(name="count"),
MagicMock(name="total"),
]
for i, col_name in enumerate(
["id", "name", "dob", "email", "gender", "created", "modified", "count", "total"]
):
mock_cursor.description[i].name = col_name
row_with_counts = sample_db_row + (1, 1)
mock_cursor.fetchall.return_value = [row_with_counts]
mock_conn = MagicMock()
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
mocker.patch("filter_insert.init_db_connection", return_value=mock_conn)
with app.test_request_context(
method="GET", query_string={"sortby": "created", "asc": "true"}
):
import filter_insert
result = filter_insert.make_filter_request()
call_args = mock_cursor.execute.call_args
sql = call_args[0][0]
assert "ORDER BY created ASC" in sql

View File

@@ -0,0 +1,514 @@
"""Tests for update_delete.py - PUT (update) & DELETE handlers."""
from unittest.mock import MagicMock
import pytest
from flask import Flask
from psycopg2 import IntegrityError
class TestMain:
"""Tests for main() dispatcher function."""
def test_main_put_calls_update(self, mocker):
"""Test PUT request routes to make_update_request().
Given:
A PUT request to the endpoint.
When:
main() is called.
Then:
make_update_request() is invoked and returns 200.
"""
app = Flask(__name__)
with app.test_request_context(method="PUT"):
mock_update = mocker.patch(
"update_delete.make_update_request",
return_value=({"id": "123"}, 200, {}),
)
import update_delete
result = update_delete.main()
mock_update.assert_called_once()
assert result[1] == 200
def test_main_delete_calls_delete(self, mocker):
"""Test DELETE request routes to make_delete_request().
Given:
A DELETE request to the endpoint.
When:
main() is called.
Then:
make_delete_request() is invoked and returns 200.
"""
app = Flask(__name__)
with app.test_request_context(method="DELETE"):
mock_delete = mocker.patch(
"update_delete.make_delete_request",
return_value=({"id": "123"}, 200, {}),
)
import update_delete
result = update_delete.main()
mock_delete.assert_called_once()
assert result[1] == 200
def test_main_invalid_method_returns_405(self, mocker):
"""Test unsupported HTTP method returns 405.
Given:
A POST request (unsupported method for this endpoint).
When:
main() is called.
Then:
Returns 405 Method Not Allowed with error message.
"""
app = Flask(__name__)
with app.test_request_context(method="POST"):
import update_delete
result = update_delete.main()
assert result[1] == 405
assert "error" in result[0]
class TestMakeUpdateRequest:
"""Tests for make_update_request() - user update."""
def test_update_missing_user_id(self, mocker):
"""Test missing X-Fission-Params-UserID header returns 400.
Given:
PUT request without X-Fission-Params-UserID header.
When:
make_update_request() is called.
Then:
Returns 400 Bad Request.
Response contains errorCode 'MISSING_USER_ID'.
"""
app = Flask(__name__)
with app.test_request_context(
method="PUT",
json={"name": "Updated Name"},
content_type="application/json",
):
import update_delete
result = update_delete.make_update_request()
assert result[1] == 400
response_data = result[0].get_json()
assert response_data["errorCode"] == "MISSING_USER_ID"
def test_update_user_not_found(self, mocker):
"""Test update non-existent user returns 404.
Given:
Valid X-Fission-Params-UserID header.
User does not exist in database (SELECT returns None).
When:
PUT request to update user.
Then:
Returns 404 Not Found.
Response contains errorCode 'USER_NOT_FOUND'.
"""
app = Flask(__name__)
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = None # User not found
mock_conn = MagicMock()
mock_conn.__enter__ = MagicMock(return_value=mock_conn)
mock_conn.__exit__ = MagicMock(return_value=False)
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
mocker.patch("update_delete.init_db_connection", return_value=mock_conn)
with app.test_request_context(
method="PUT",
headers={"X-Fission-Params-UserID": "nonexistent-id"},
json={"name": "Updated Name"},
content_type="application/json",
):
import update_delete
result = update_delete.make_update_request()
assert result[1] == 404
response_data = result[0].get_json()
assert response_data["errorCode"] == "USER_NOT_FOUND"
def test_update_success(self, mocker, sample_db_row):
"""Test successful user update returns 200.
Given:
Valid X-Fission-Params-UserID header.
User exists in database.
Valid update data in request body.
When:
PUT request to update user.
Then:
Returns 200 OK.
User data is updated in database.
"""
app = Flask(__name__)
mock_cursor = MagicMock()
mock_cursor.description = [
MagicMock(name="id"),
MagicMock(name="name"),
MagicMock(name="dob"),
MagicMock(name="email"),
MagicMock(name="gender"),
MagicMock(name="created"),
MagicMock(name="modified"),
]
for i, col_name in enumerate(
["id", "name", "dob", "email", "gender", "created", "modified"]
):
mock_cursor.description[i].name = col_name
# First fetchone returns existing user, second returns updated user
mock_cursor.fetchone.side_effect = [sample_db_row, sample_db_row]
mock_conn = MagicMock()
mock_conn.__enter__ = MagicMock(return_value=mock_conn)
mock_conn.__exit__ = MagicMock(return_value=False)
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
mocker.patch("update_delete.init_db_connection", return_value=mock_conn)
with app.test_request_context(
method="PUT",
headers={"X-Fission-Params-UserID": "550e8400-e29b-41d4-a716-446655440000"},
json={"name": "Updated Name"},
content_type="application/json",
):
import update_delete
result = update_delete.make_update_request()
assert result[1] == 200
def test_update_validation_error_invalid_email(self, mocker):
"""Test invalid email format raises serialization error.
Given:
Valid X-Fission-Params-UserID header.
Request body with invalid email format 'invalid-email'.
When:
PUT request to update user.
Then:
Raises TypeError because ValidationError.errors() contains
ValueError which is not JSON serializable.
Note:
This test documents a bug in the source code where e.errors()
is passed directly to jsonify without sanitization.
"""
app = Flask(__name__)
with app.test_request_context(
method="PUT",
headers={"X-Fission-Params-UserID": "some-id"},
json={"email": "invalid-email"},
content_type="application/json",
):
import update_delete
with pytest.raises(TypeError, match="not JSON serializable"):
update_delete.make_update_request()
def test_update_duplicate_email(self, mocker, sample_db_row):
"""Test duplicate email on update returns 409 Conflict.
Given:
Valid X-Fission-Params-UserID header.
User exists in database.
New email already exists for another user.
Database raises IntegrityError on UPDATE.
When:
PUT request to update user email.
Then:
Returns 409 Conflict.
Response contains errorCode 'DUPLICATE_USER'.
"""
app = Flask(__name__)
mock_cursor = MagicMock()
mock_cursor.description = [
MagicMock(name="id"),
MagicMock(name="name"),
MagicMock(name="dob"),
MagicMock(name="email"),
MagicMock(name="gender"),
MagicMock(name="created"),
MagicMock(name="modified"),
]
for i, col_name in enumerate(
["id", "name", "dob", "email", "gender", "created", "modified"]
):
mock_cursor.description[i].name = col_name
# First fetchone returns existing user
mock_cursor.fetchone.return_value = sample_db_row
# Second execute raises IntegrityError
mock_cursor.execute.side_effect = [None, IntegrityError("duplicate key")]
mock_conn = MagicMock()
mock_conn.__enter__ = MagicMock(return_value=mock_conn)
mock_conn.__exit__ = MagicMock(return_value=False)
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
mocker.patch("update_delete.init_db_connection", return_value=mock_conn)
with app.test_request_context(
method="PUT",
headers={"X-Fission-Params-UserID": "550e8400-e29b-41d4-a716-446655440000"},
json={"email": "duplicate@example.com"},
content_type="application/json",
):
import update_delete
result = update_delete.make_update_request()
assert result[1] == 409
response_data = result[0].get_json()
assert response_data["errorCode"] == "DUPLICATE_USER"
class TestMakeDeleteRequest:
"""Tests for make_delete_request() - user deletion."""
def test_delete_missing_user_id(self, mocker):
"""Test missing X-Fission-Params-UserID header returns 400.
Given:
DELETE request without X-Fission-Params-UserID header.
When:
make_delete_request() is called.
Then:
Returns 400 Bad Request.
Response contains errorCode 'MISSING_USER_ID'.
"""
app = Flask(__name__)
with app.test_request_context(method="DELETE"):
import update_delete
result = update_delete.make_delete_request()
assert result[1] == 400
response_data = result[0].get_json()
assert response_data["errorCode"] == "MISSING_USER_ID"
def test_delete_user_not_found(self, mocker):
"""Test delete non-existent user returns 404.
Given:
Valid X-Fission-Params-UserID header.
User does not exist in database (SELECT returns None).
When:
DELETE request to delete user.
Then:
Returns 404 Not Found.
"""
app = Flask(__name__)
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = None # User not found
mock_conn = MagicMock()
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
mocker.patch("update_delete.init_db_connection", return_value=mock_conn)
with app.test_request_context(
method="DELETE",
headers={"X-Fission-Params-UserID": "nonexistent-id"},
):
import update_delete
result = update_delete.make_delete_request()
assert result[1] == 404
def test_delete_success(self, mocker, sample_db_row):
"""Test successful user deletion returns 200.
Given:
Valid X-Fission-Params-UserID header.
User exists in database.
When:
DELETE request to delete user.
Then:
Returns 200 OK.
User is deleted from database.
Response contains deleted user data.
"""
app = Flask(__name__)
mock_cursor = MagicMock()
mock_cursor.description = [
MagicMock(name="id"),
MagicMock(name="name"),
MagicMock(name="dob"),
MagicMock(name="email"),
MagicMock(name="gender"),
MagicMock(name="created"),
MagicMock(name="modified"),
]
for i, col_name in enumerate(
["id", "name", "dob", "email", "gender", "created", "modified"]
):
mock_cursor.description[i].name = col_name
# First fetchone checks existence, second returns deleted row
mock_cursor.fetchone.side_effect = [(1,), sample_db_row]
mock_conn = MagicMock()
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
mocker.patch("update_delete.init_db_connection", return_value=mock_conn)
with app.test_request_context(
method="DELETE",
headers={"X-Fission-Params-UserID": "550e8400-e29b-41d4-a716-446655440000"},
):
import update_delete
result = update_delete.make_delete_request()
assert result[1] == 200
class TestUpdatePartialFields:
"""Tests for partial field updates."""
def test_update_only_name(self, mocker, sample_db_row):
"""Test update only name field.
Given:
Valid X-Fission-Params-UserID header.
User exists in database.
Request body contains only 'name' field.
When:
PUT request to update user.
Then:
Returns 200 OK.
UPDATE SQL only includes name field (plus modified timestamp).
"""
app = Flask(__name__)
mock_cursor = MagicMock()
mock_cursor.description = [
MagicMock(name="id"),
MagicMock(name="name"),
MagicMock(name="dob"),
MagicMock(name="email"),
MagicMock(name="gender"),
MagicMock(name="created"),
MagicMock(name="modified"),
]
for i, col_name in enumerate(
["id", "name", "dob", "email", "gender", "created", "modified"]
):
mock_cursor.description[i].name = col_name
mock_cursor.fetchone.side_effect = [sample_db_row, sample_db_row]
mock_conn = MagicMock()
mock_conn.__enter__ = MagicMock(return_value=mock_conn)
mock_conn.__exit__ = MagicMock(return_value=False)
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
mocker.patch("update_delete.init_db_connection", return_value=mock_conn)
with app.test_request_context(
method="PUT",
headers={"X-Fission-Params-UserID": "550e8400-e29b-41d4-a716-446655440000"},
json={"name": "New Name Only"},
content_type="application/json",
):
import update_delete
result = update_delete.make_update_request()
assert result[1] == 200
# Check that UPDATE SQL contains name field
call_args = mock_cursor.execute.call_args_list[-1]
sql = call_args[0][0]
assert "name=" in sql
def test_update_multiple_fields(self, mocker, sample_db_row):
"""Test update multiple fields at once.
Given:
Valid X-Fission-Params-UserID header.
User exists in database.
Request body contains name, email, and gender fields.
When:
PUT request to update user.
Then:
Returns 200 OK.
UPDATE SQL includes all three fields.
"""
app = Flask(__name__)
mock_cursor = MagicMock()
mock_cursor.description = [
MagicMock(name="id"),
MagicMock(name="name"),
MagicMock(name="dob"),
MagicMock(name="email"),
MagicMock(name="gender"),
MagicMock(name="created"),
MagicMock(name="modified"),
]
for i, col_name in enumerate(
["id", "name", "dob", "email", "gender", "created", "modified"]
):
mock_cursor.description[i].name = col_name
mock_cursor.fetchone.side_effect = [sample_db_row, sample_db_row]
mock_conn = MagicMock()
mock_conn.__enter__ = MagicMock(return_value=mock_conn)
mock_conn.__exit__ = MagicMock(return_value=False)
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
mocker.patch("update_delete.init_db_connection", return_value=mock_conn)
with app.test_request_context(
method="PUT",
headers={"X-Fission-Params-UserID": "550e8400-e29b-41d4-a716-446655440000"},
json={"name": "New Name", "email": "new@example.com", "gender": "female"},
content_type="application/json",
):
import update_delete
result = update_delete.make_update_request()
assert result[1] == 200
call_args = mock_cursor.execute.call_args_list[-1]
sql = call_args[0][0]
assert "name=" in sql
assert "email=" in sql
assert "gender=" in sql

View File

@@ -1,5 +1,3 @@
from flask import current_app, jsonify, request
from helpers import CORS_HEADERS, db_row_to_dict, init_db_connection
from psycopg2 import IntegrityError
@@ -15,7 +13,7 @@ def main():
"fntimeout": 300,
"http_triggers": {
"ai-admin-update-delete-user-http": {
"url": "/ai/admin/users/{UserID}",
"url": "/ailbl/ai/admin/users/{UserID}",
"methods": ["DELETE", "PUT"]
}
}
@@ -35,6 +33,40 @@ def main():
def make_update_request():
r"""make_update_request() -> tuple[Response, int, dict]
Update an existing user by ID.
Retrieves the user ID from ``X-Fission-Params-UserID`` header, validates
the request body using :class:`AiUserUpdate` schema, and performs a
partial update on the user record.
Uses row-level locking (``SELECT ... FOR UPDATE``) to prevent concurrent
modification conflicts.
Returns:
tuple: A tuple containing:
- JSON response with updated user data or error details
- HTTP status code (200 on success, 400/404/409 on error)
- CORS headers dict
Raises:
ValidationError: If request body fails Pydantic validation (returns 400).
IntegrityError: If email conflicts with another user (returns 409).
Example::
>>> # PUT /ai/admin/users/550e8400-e29b-41d4-a716-446655440000
>>> # Header: X-Fission-Params-UserID: 550e8400-e29b-41d4-a716-446655440000
>>> # Body: {"name": "Jane Doe"}
>>> # Response: 200 OK
>>> {
... "id": "550e8400-e29b-41d4-a716-446655440000",
... "name": "Jane Doe",
... "email": "john@example.com",
... "modified": "2024-01-02T10:00:00"
... }
"""
user_id = request.headers.get("X-Fission-Params-UserID")
if not user_id:
return jsonify({"errorCode": "MISSING_USER_ID"}), 400, CORS_HEADERS
@@ -97,6 +129,19 @@ def make_update_request():
def __delete_user(cursor, id: str):
r"""Delete a user from the database by ID.
Args:
cursor: Database cursor object for executing queries.
id (str): UUID of the user to delete.
Returns:
dict | str: User data dict if deleted successfully,
or ``"USER_NOT_FOUND"`` string if user doesn't exist.
Note:
This is a private function. Use :func:`make_delete_request` instead.
"""
cursor.execute("SELECT 1 FROM ai_user WHERE id = %(id)s", {"id": id})
if not cursor.fetchone():
return "USER_NOT_FOUND"
@@ -106,7 +151,30 @@ def __delete_user(cursor, id: str):
return db_row_to_dict(cursor, row)
def make_delete_request():
r"""make_delete_request() -> tuple[Response, int, dict]
Delete a user by ID.
Retrieves the user ID from ``X-Fission-Params-UserID`` header and
deletes the user from the database if found.
Returns:
tuple: A tuple containing:
- JSON response with deleted user data or error details
- HTTP status code (200 on success, 400/404/500 on error)
- CORS headers dict (may be omitted on some error responses)
Example::
>>> # DELETE /ai/admin/users/550e8400-e29b-41d4-a716-446655440000
>>> # Header: X-Fission-Params-UserID: 550e8400-e29b-41d4-a716-446655440000
>>> # Response: 200 OK
>>> {
... "id": "550e8400-e29b-41d4-a716-446655440000",
... "name": "John Doe",
... "email": "john@example.com"
... }
"""
user_id = request.headers.get("X-Fission-Params-UserID")
if not user_id:
return jsonify({"errorCode": "MISSING_USER_ID"}), 400, CORS_HEADERS