Files

434 lines
11 KiB
Python
Raw Permalink Normal View History

2026-03-18 20:21:56 +07:00
"""
Example: Basic CRUD operations for a resource.
This demonstrates:
- Pydantic request validation
- Database operations with helpers
- Standard error handling
- Proper Fission docstring configuration
"""
from flask import request
from helpers import (
init_db_connection,
db_row_to_dict,
db_rows_to_array,
get_user_from_headers,
format_error_response,
)
from exceptions import ValidationError, NotFoundError, ConflictError, DatabaseError
from models import ItemResponse, ItemCreateRequest, ItemUpdateRequest
# Pool manager executor, one request at a time
def create(event, context):
"""
```fission
{
"name": "create-item",
"http_triggers": {
"create": {
"url": "/api/items",
"methods": ["POST"]
}
}
}
```
Create a new item.
**Request Body:**
```json
{
"name": "string (required, 1-255 chars)",
"description": "string (optional)",
"status": "active|inactive|pending",
"metadata": {}
}
```
**Response:**
- 200: Item created successfully
- 400: Validation error
- 409: Conflict (e.g., duplicate name)
- 500: Database error
"""
# Get user for audit trail
x_user = get_user_from_headers()
# Validate request payload
try:
data = ItemCreateRequest(**request.get_json())
except Exception as e:
raise ValidationError(f"Invalid request: {str(e)}", x_user=x_user)
conn = None
try:
conn = init_db_connection()
cursor = conn.cursor()
# Check for conflicts (example)
cursor.execute(
"SELECT id FROM items WHERE name = %s",
(data.name,)
)
if cursor.fetchone():
raise ConflictError(
f"Item with name '{data.name}' already exists",
x_user=x_user,
details={"name": data.name}
)
# Insert new item
cursor.execute(
"""
INSERT INTO items (name, description, status, metadata)
VALUES (%s, %s, %s, %s)
RETURNING id, name, description, status, metadata, created, modified
""",
(data.name, data.description, data.status.value, data.metadata)
)
row = cursor.fetchone()
conn.commit()
# Build response
item = db_row_to_dict(cursor, row)
return item
except (ValidationError, NotFoundError, ConflictError, DatabaseError):
# Re-raise our own exceptions
raise
except Exception as e:
if conn:
conn.rollback()
raise DatabaseError(f"Database error: {str(e)}", x_user=x_user)
finally:
if conn:
conn.close()
def list_items(event, context):
"""
```fission
{
"name": "list-items",
"http_triggers": {
"list": {
"url": "/api/items",
"methods": ["GET"]
}
}
}
```
List items with optional filtering and pagination.
**Query Parameters:**
- `page` (int): Page number, zero-based (default: 0)
- `size` (int): Items per page (default: 10, max: 100)
- `asc` (bool): Sort ascending (default: true)
- `filter[ids]` (string[]): Filter by specific IDs
- `filter[keyword]` (string): Search in name/description
- `filter[status]` (string[]): Filter by status values
- `filter[created_from]` (datetime): Filter created after
- `filter[created_to]` (datetime): Filter created before
**Response:**
```json
{
"data": [...],
"page": 0,
"size": 10,
"total": 42
}
```
"""
from helpers import str_to_bool
# Parse pagination
page = int(request.args.get("page", 0))
size = int(request.args.get("size", 10))
asc = str_to_bool(request.args.get("asc", "true"))
# Parse filters
ids = request.args.getlist("filter[ids]")
keyword = request.args.get("filter[keyword]")
statuses = request.args.getlist("filter[status]")
# Build query
conditions = []
params = []
if ids:
conditions.append(f"id IN ({', '.join(['%s'] * len(ids))})")
params.extend(ids)
if keyword:
conditions.append("(name ILIKE %s OR description ILIKE %s)")
params.extend([f"%{keyword}%", f"%{keyword}%"])
if statuses:
conditions.append(f"status IN ({', '.join(['%s'] * len(statuses))})")
params.extend(statuses)
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
conn = None
try:
conn = init_db_connection()
cursor = conn.cursor()
# Get total count
count_sql = f"SELECT COUNT(*) FROM items {where_clause}"
cursor.execute(count_sql, params)
total = cursor.fetchone()[0]
# Get paginated data
offset = page * size
data_sql = f"""
SELECT id, name, description, status, metadata, created, modified
FROM items
{where_clause}
ORDER BY created {'ASC' if asc else 'DESC'}
LIMIT %s OFFSET %s
"""
cursor.execute(data_sql, params + [size, offset])
rows = cursor.fetchall()
items = [db_row_to_dict(cursor, row) for row in rows]
return {
"data": items,
"page": page,
"size": size,
"total": total
}
except Exception as e:
raise DatabaseError(f"Failed to list items: {str(e)}")
finally:
if conn:
conn.close()
def get_item(event, context):
"""
```fission
{
"name": "get-item",
"http_triggers": {
"get": {
"url": "/api/items/:id",
"methods": ["GET"]
}
}
}
```
Get a specific item by ID.
**URL Parameters:**
- `id` (string): Item UUID
**Response:**
- 200: Item found
- 404: Item not found
- 500: Database error
"""
# Extract item ID from path (Fission passes path params differently depending on trigger)
# For HTTP triggers, the ID would come from the URL path
item_id = request.view_args.get('id') if hasattr(request, 'view_args') else None
if not item_id:
# Fallback: parse from query or request path
item_id = request.path.rstrip('/').split('/')[-1]
if not item_id:
raise ValidationError("Item ID is required")
conn = None
try:
conn = init_db_connection()
cursor = conn.cursor()
cursor.execute(
"""
SELECT id, name, description, status, metadata, created, modified
FROM items WHERE id = %s
""",
(item_id,)
)
row = cursor.fetchone()
if not row:
raise NotFoundError(f"Item {item_id} not found")
return db_row_to_dict(cursor, row)
except NotFoundError:
raise
except Exception as e:
raise DatabaseError(f"Failed to fetch item: {str(e)}")
finally:
if conn:
conn.close()
def update_item(event, context):
"""
```fission
{
"name": "update-item",
"http_triggers": {
"update": {
"url": "/api/items/:id",
"methods": ["PUT", "PATCH"]
}
}
}
```
Update an existing item.
**URL Parameters:**
- `id` (string): Item UUID
**Request Body:**
```json
{
"name": "string (optional)",
"description": "string (optional)",
"status": "active|inactive|pending (optional)"
}
```
**Response:**
- 200: Item updated successfully
- 404: Item not found
- 409: Conflict (duplicate name)
- 400: Validation error
- 500: Database error
"""
x_user = get_user_from_headers()
# Extract item ID
item_id = request.view_args.get('id') if hasattr(request, 'view_args') else None
if not item_id:
item_id = request.path.rstrip('/').split('/')[-1]
if not item_id:
raise ValidationError("Item ID is required")
# Validate request
try:
data = ItemUpdateRequest(**request.get_json())
except Exception as e:
raise ValidationError(f"Invalid request: {str(e)}", x_user=x_user)
# Build update statement dynamically
updates = []
params = []
if data.name is not None:
updates.append("name = %s")
params.append(data.name)
if data.description is not None:
updates.append("description = %s")
params.append(data.description)
if data.status is not None:
updates.append("status = %s")
params.append(data.status.value)
if data.metadata is not None:
updates.append("metadata = %s")
params.append(data.metadata)
if not updates:
raise ValidationError("No update fields provided", x_user=x_user)
updates.append("modified = NOW()")
params.append(item_id)
conn = None
try:
conn = init_db_connection()
cursor = conn.cursor()
# Check for name conflict if name is being updated
if data.name:
cursor.execute(
"SELECT id FROM items WHERE name = %s AND id != %s",
(data.name, item_id)
)
if cursor.fetchone():
raise ConflictError(
f"Another item with name '{data.name}' already exists",
x_user=x_user,
details={"name": data.name}
)
# Execute update
sql = f"UPDATE items SET {', '.join(updates)} WHERE id = %s RETURNING *"
cursor.execute(sql, params)
row = cursor.fetchone()
conn.commit()
if not row:
raise NotFoundError(f"Item {item_id} not found", x_user=x_user)
return db_row_to_dict(cursor, row)
except (ValidationError, NotFoundError, ConflictError, DatabaseError):
raise
except Exception as e:
if conn:
conn.rollback()
raise DatabaseError(f"Failed to update item: {str(e)}", x_user=x_user)
finally:
if conn:
conn.close()
def delete_item(event, context):
"""
```fission
{
"name": "delete-item",
"http_triggers": {
"delete": {
"url": "/api/items/:id",
"methods": ["DELETE"]
}
}
}
```
Delete an item.
**URL Parameters:**
- `id` (string): Item UUID
**Response:**
- 204: Item deleted successfully
- 404: Item not found
- 500: Database error
"""
x_user = get_user_from_headers()
# Extract item ID
item_id = request.view_args.get('id') if hasattr(request, 'view_args') else None
if not item_id:
item_id = request.path.rstrip('/').split('/')[-1]
if not item_id:
raise ValidationError("Item ID is required", x_user=x_user)
conn = None
try:
conn = init_db_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM items WHERE id = %s", (item_id,))
conn.commit()
if cursor.rowcount == 0:
raise NotFoundError(f"Item {item_id} not found", x_user=x_user)
return None # 204 No Content
except NotFoundError:
raise
except Exception as e:
if conn:
conn.rollback()
raise DatabaseError(f"Failed to delete item: {str(e)}", x_user=x_user)
finally:
if conn:
conn.close()