16 KiB
Plan: Enhance Fission Python Projects with Exceptions, Pydantic Models, and Code Quality Improvements
Context
Three Fission Python projects need systematic improvements to enhance error handling, data validation, and code maintainability:
- py-eom-storage: Storage management API (GET/POST /storages, GET/PUT/DELETE /storages/{id})
- py-eom-quota: Quota management API (GET/POST /quotas, POST/DELETE /users/{userId}/quotas/{quotaId})
- py-ailbl-scheduler: Background worker system for scheduled tasks
Currently, all projects use generic Exception with simple error messages returned as {"error": str(err)} with 500 status. There's no structured error handling, request validation, or consistent response formatting. Some projects have Pydantic models but not comprehensively used.
Goals
-
Custom Exceptions: Implement domain-specific exception classes with:
error_code: Machine-readable error identifierhttp_status_code: Appropriate HTTP status (400, 404, 409, 500, etc.)error_msg: Human-readable messagex_user: User identifier from request header (X-Fission-Params-UserId or similar)
-
Pydantic Models: Add comprehensive request/response models for all endpoints:
- Request body validation (POST/PUT)
- Query parameter validation (GET)
- Structured response schemas
- Consistent error response format
-
Code Quality: Improve maintainability with:
- Detailed docstrings for all functions and classes
- Refactoring of complex, multi-responsibility functions
- Consistent error handling patterns
- Fix broken imports and type issues
Project-Specific Plans
1. py-eom-storage
Current State:
- Has Pydantic models:
S3Resource,S3Credential(unused) - Uses dataclasses:
Page,Filter(should be Pydantic) - Endpoints:
/eom/admin/storages(filter_or_insert.py),/eom/admin/storages/{StorageId}(update_or_delete.py)
Changes Needed:
A. Create src/exceptions.py:
class StorageException(Exception):
"""Base exception for storage-related errors."""
def __init__(self, error_code: str, http_status: int, error_msg: str, x_user: str = None):
self.error_code = error_code
self.http_status = http_status
self.error_msg = error_msg
self.x_user = x_user
super().__init__(self.error_msg)
class ValidationError(StorageException):
"""Invalid input data."""
class NotFoundError(StorageException):
"""Resource not found."""
class ConflictError(StorageException):
"""Resource conflict (e.g., duplicate name)."""
class DatabaseError(StorageException):
"""Database operation failed."""
class S3ConnectionError(StorageException):
"""S3/MinIO connection failed."""
B. Create/Update src/models.py (or extend existing):
# Request models
class StorageCreateRequest(BaseModel):
name: str = Field(..., min_length=1, max_length=255)
description: typing.Optional[str] = None
resource: dict # Should validate S3 structure
class StorageUpdateRequest(BaseModel):
name: typing.Optional[str] = None
description: typing.Optional[str] = None
resource: typing.Optional[dict] = None
active: typing.Optional[bool] = None
# Query models (convert Page/Filter to Pydantic)
class StorageFilter(BaseModel):
ids: typing.Optional[typing.List[str]] = None
keyword: typing.Optional[str] = None
collection_id: typing.Optional[str] = None
enable: typing.Optional[bool] = None
created_from: typing.Optional[datetime] = None
created_to: typing.Optional[datetime] = None
# ... other filters
class StorageQuery(BaseModel):
page: int = 0
size: int = Field(8, ge=1, le=100)
asc: bool = True
sortby: typing.Optional[Literal["name", "enable", "created", "modified"]] = None
filter: StorageFilter = StorageFilter()
# Response models
class StorageResponse(BaseModel):
id: str
name: str
description: typing.Optional[str]
resource: dict
enable: bool
created: datetime
modified: datetime
class ErrorResponse(BaseModel):
error_code: str
http_status: int
error_msg: str
x_user: typing.Optional[str] = None
details: typing.Optional[dict] = None
C. Refactor filter_or_insert.py:
- Replace try-except to catch custom exceptions
- Validate request body using Pydantic in
make_insert_request - Use Pydantic for query parsing in
make_filter_request - Add helper function
handle_exceptionto format error responses consistently - Extract SQL queries into separate functions for testability
- Add comprehensive docstrings explaining each endpoint's behavior
D. Refactor update_or_delete.py:
- Similar pattern: custom exceptions, Pydantic validation
- Refactor
is_depended_on_storage- this function does too much, split into smaller helpers - Add detailed comments for each database operation
- Ensure proper error messages with appropriate HTTP status codes
E. Update helpers.py:
- Add utility
get_user_from_header(request)to extract x-user from various headers
2. py-eom-quota
Current State:
- Already has extensive Pydantic models in
models.py(QuotaPage, UserQuotaPage, ScheduleCreate, etc.) - But:
userquota_filter.pyimports fromquota_update_or_deletewhich doesn't exist (broken import) - Need to expand models to cover all request/response scenarios
- Endpoints:
/eom/admin/quotas(filter),/eom/admin/users/{UserId}/quotas(filter/insert),/eom/admin/users/{UserId}/quotas/{QuotaId}(update/delete)
Changes Needed:
A. Create src/exceptions.py:
class QuotaException(Exception):
"""Base exception for quota management."""
def __init__(self, error_code: str, http_status: int, error_msg: str, x_user: str = None):
self.error_code = error_code
self.http_status = http_status
self.error_msg = error_msg
self.x_user = x_user
super().__init__(self.error_msg)
class QuotaNotFoundError(QuotaException):
"""Quota does not exist."""
class UserQuotaConflictError(QuotaException):
"""User already has this type of quota."""
class ValidationError(QuotaException):
"""Invalid request data."""
class DatabaseError(QuotaException):
"""Database operation failed."""
B. Extend src/models.py:
The existing models mix schedule and quota models. Need to:
-
Separate or clearly document which are for quotas vs schedules
-
Add request models:
class QuotaCreateRequest(BaseModel): name: str description: typing.Optional[str] = None type: QuotaType value: typing.Union[MaxSizeBody, MaxOrderTimesBody] expire: ExpireBody class QuotaUpdateRequest(BaseModel): name: typing.Optional[str] = None description: typing.Optional[str] = None enable: typing.Optional[bool] = None type: typing.Optional[QuotaType] = None value: typing.Optional[typing.Union[MaxSizeBody, MaxOrderTimesBody]] = None expire: typing.Optional[ExpireBody] = None class UserQuotaAssignRequest(BaseModel): quota_id: str -
Ensure response models exist (QuotaResponse, UserQuotaResponse)
C. Fix userquota_filter.py:
- Fix broken import:
from quota_update_or_delete import __get_by_id→from userquota_insert_or_delete import __get_by_id(or better: move__get_by_idto a shared helpers module) - Refactor
make_filter_request:- Use
UserQuotaPagePydantic model properly - Validate user_id header is present using Pydantic
- Replace try-except with custom exceptions
- Add comprehensive docstring
- The function currently manually sets
paging.filter.user_ids = [user_id]- this should be part of a validation layer
- Use
D. Refactor userquota_insert_or_delete.py:
- Fix the same broken import pattern (it imports nothing but uses
__get_by_idin filter) - Add proper request validation using Pydantic models
- Replace generic exceptions with
UserQuotaConflictError,QuotaNotFoundError, etc. - Refactor
__validate_user_quota_type- currently SQL query is hardcoded, add comments explaining business logic - The insert SQL has wrong columns:
INSERT INTO eom_user_quota(id, name, description, type, value, expire)but the table likely only has (id, user_id, quota_id). Need to check database schema but from the code it seems mismatched.
E. Improve helpers.py:
- Add utility functions for extracting and validating user headers
- Add consistent error handling helpers
3. py-ailbl-scheduler
Current State:
- No HTTP endpoints (only time-triggered workers)
- No Pydantic models needed per user's choice
- Needs custom exceptions and code quality improvements
- Workers:
worker_session_picker.py,worker_session_poller.py,worker_scheduler_scan.py,worker_schedule_auto_disable.py - Common utilities in
common.py,helpers.py
Changes Needed:
A. Create src/exceptions.py:
class SchedulerException(Exception):
"""Base exception for scheduler operations."""
def __init__(self, error_code: str, error_msg: str, details: dict = None):
self.error_code = error_code
self.error_msg = error_msg
self.details = details
super().__init__(self.error_msg)
class ScheduleNotFoundError(SchedulerException):
"""Schedule does not exist."""
class SessionLockError(SchedulerError):
"""Failed to acquire session lock."""
class DagsterError(SchedulerError):
"""Dagster pipeline execution failed."""
class CronParseError(SchedulerError):
"""Invalid cron expression."""
class ConfigurationError(SchedulerError):
"""Missing or invalid configuration."""
B. Refactor worker_scheduler_scan.py:
This is the most complex function (446 lines). Goals:
- Extract helper functions:
_normalize_cron_for_cronner(already exists)_as_date,_as_time(already exist)_within_active_window(already exists)_is_due_by_cron(already exists)_is_due_by_freq(already exists)- Extract the schedule creation logic into
_create_session_for_schedule(cur, schedule, now, slot_start) - Extract the candidate schedule selection into
_fetch_due_schedules(cur, now, slot_start, slot_end, limit=50)
- Add detailed docstrings explaining the overall algorithm: "Scan for schedules that are due in the current time slot and create sessions atomically"
- Improve variable names (e.g.,
s→schedule,cur→cursor) - Add comments explaining the advisory lock strategy and why it's needed
- Ensure proper exception handling with custom exceptions
- The function currently catches generic Exception at the end - wrap specific operations with appropriate custom exceptions
C. Refactor worker_session_picker.py:
- Similar breakdown: extract
_pick_and_claim_sessions(conn, limit=20)helper - Extract
_process_kind5_session(session, ctx)and_process_kind1_session(session, ctx)into separate functions - Add detailed docstring explaining the picking strategy (FOR UPDATE SKIP LOCKED)
- Replace bare
except Exceptionwith specific exception types - Add comments explaining the kind handling logic (kind 5 vs kind 1)
- The function
_build_run_config_kind5is specific to that kind - could be moved to a separate module if needed
D. Refactor worker_session_poller.py:
- Extract
_update_completed_session(cur, session_id, status_info, now)helper - Extract
_update_started_session(cur, session_id, started_dt)helper - Add docstring explaining polling strategy
- Replace generic exception handling with
DagsterErrorwhen Dagster calls fail - Add type hints for the row unpacking:
for sid, run_id, started, cron_description, created_by in rows:
E. Refactor worker_schedule_auto_disable.py:
- This is simple enough already but still add comprehensive docstring
- Consider adding custom exception for database errors
F. Improve helpers.py (in scheduler):
- The
GraphQLclass and related functions are specific to Dagster - add docstrings safe_notifyis good, add docstring- Consider creating a
SchedulerHelperclass to group related utilities
G. Improve common.py:
- Already has good docstrings but could be expanded
- Add type hints to function signatures
- Break
launch_pipeline_executionif too complex (handles multiple error cases)
Common Patterns
Exception Hierarchy
Each project will have:
class BaseProjectException(Exception):
"""Base with error_code, http_status (if applicable), message, metadata."""
pass
# Specific exceptions inherit from base
class NotFoundError(BaseProjectException): ...
class ValidationError(BaseProjectException): ...
class ConflictError(BaseProjectException): ...
class DatabaseError(BaseProjectException): ...
# Domain-specific: StorageNotFoundError, QuotaConflictError, ScheduleNotFoundError, etc.
Error Response Format
Standardized JSON response:
{
"error_code": "STORAGE_NOT_FOUND",
"http_status": 404,
"error_msg": "Storage with id 'xyz' does not exist",
"x_user": "user123",
"details": { /* optional additional context */ }
}
Middleware Pattern
In each HTTP endpoint function:
def main():
try:
# Extract user header
x_user = request.headers.get("X-Fission-Params-UserId")
# Route to handler
return handler()
except ValidationError as e:
return error_response(e), 400
except NotFoundError as e:
return error_response(e), 404
except ConflictError as e:
return error_response(e), 409
except StorageException as e:
logger.error(f"Storage error: {e.error_code}: {e.error_msg}")
return error_response(e), 500
except Exception as e:
logger.exception("Unexpected error")
return {"error": "Internal server error"}, 500
Implementation Order
- Phase 1: Create exception modules for all three projects
- Phase 2: Add/expand Pydantic models (storage, then complete quota)
- Phase 3: Refactor endpoints to use exceptions and models
- Phase 4: Refactor complex functions in scheduler
- Phase 5: Documentation pass - ensure all functions have docstrings
- Phase 6: Test manually by running functions (no automated tests to update)
Verification Steps
-
Manual Testing:
- Deploy each function to local Fission or use test environment
- Test error cases: invalid input, missing resources, database failures
- Verify error response format matches specification
- Check logs for proper error logging
-
Code Review:
- All functions have docstrings with Args, Returns, Raises sections
- No function exceeds ~50 lines (extracted helpers where needed)
- All exceptions are specific, not generic
Exception - Request validation happens before business logic
-
Import Verification:
- Fix broken imports (especially in py-eom-quota's userquota_filter.py)
- Ensure circular dependencies are avoided
-
Type Safety:
- Run static type checker if available (mypy/pyright)
- Ensure all functions have return type hints
Critical Files to Modify
py-eom-storage:
src/exceptions.py(new)src/models.py(create/extend)src/filter_or_insert.py(refactor)src/update_or_delete.py(refactor)src/helpers.py(add utilities)src/vault.py(minor: improve docs)
py-eom-quota:
src/exceptions.py(new)src/models.py(extend with request models)src/userquota_filter.py(fix imports, refactor)src/userquota_insert_or_delete.py(refactor, fix SQL if needed)src/helpers.py(add utilities)
py-ailbl-scheduler:
src/exceptions.py(new)src/worker_scheduler_scan.py(major refactor)src/worker_session_picker.py(refactor)src/worker_session_poller.py(refactor)src/worker_schedule_auto_disable.py(docs)src/common.py(docs, type hints)src/helpers.py(docs, maybe extract class)
Notes
- All changes are in
/workspaces/claude-marketplace/data/examples/ - Preserve existing API contracts (URLs, HTTP methods)
- Do not change database schema
- Maintain backward compatibility with existing clients
- Focus on internal improvements: error handling, validation, documentation
- Use consistent patterns across all three projects