ref: up
This commit is contained in:
311
fission-python/template/examples/example_scheduler.py
Normal file
311
fission-python/template/examples/example_scheduler.py
Normal file
@@ -0,0 +1,311 @@
|
||||
"""
|
||||
Example: Background job / scheduled task pattern.
|
||||
|
||||
This demonstrates:
|
||||
- Long-running job execution
|
||||
- Job status tracking
|
||||
- Error handling and retries
|
||||
- Periodic task scheduling
|
||||
- Worker session management
|
||||
|
||||
Use cases: report generation, batch processing, cleanup jobs, etc.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import time
|
||||
import uuid
|
||||
from helpers import init_db_connection, db_row_to_dict, db_rows_to_array
|
||||
from exceptions import DatabaseError
|
||||
|
||||
|
||||
def scheduled_job(event, context):
|
||||
"""
|
||||
```fission
|
||||
{
|
||||
"name": "scheduled-job",
|
||||
"http_triggers": {
|
||||
"run": {
|
||||
"url": "/jobs/run",
|
||||
"methods": ["POST"]
|
||||
}
|
||||
},
|
||||
"kafka_triggers": {
|
||||
"job-queue": {
|
||||
"topic": "job-queue",
|
||||
"consumer_group": "scheduler-workers"
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
Execute a scheduled or queued background job.
|
||||
|
||||
This function can be triggered:
|
||||
- Manually via HTTP POST /jobs/run
|
||||
- Automatically by message queue (Kafka)
|
||||
- By cron schedule (via Fission timer trigger)
|
||||
|
||||
**Request Body (HTTP trigger):**
|
||||
```json
|
||||
{
|
||||
"job_type": "report_generation",
|
||||
"parameters": {
|
||||
"report_type": "daily",
|
||||
"date": "2025-03-18"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Response:**
|
||||
- 200: Job completed successfully
|
||||
- 202: Job accepted for async processing
|
||||
- 400: Invalid request
|
||||
- 500: Job failed
|
||||
"""
|
||||
# Parse input
|
||||
job_type = event.get("job_type") or event.get("type", "default")
|
||||
parameters = event.get("parameters", {})
|
||||
|
||||
# Generate job ID for tracking
|
||||
job_id = str(uuid.uuid4())
|
||||
started_at = datetime.datetime.utcnow()
|
||||
|
||||
conn = None
|
||||
try:
|
||||
conn = init_db_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Record job start
|
||||
cursor.execute(
|
||||
"""
|
||||
INSERT INTO jobs (id, type, parameters, status, started_at)
|
||||
VALUES (%s, %s, %s, 'running', %s)
|
||||
""",
|
||||
(job_id, job_type, parameters, started_at)
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
# Execute job based on type
|
||||
if job_type == "report_generation":
|
||||
result = generate_report(cursor, job_id, parameters)
|
||||
elif job_type == "data_cleanup":
|
||||
result = cleanup_old_data(cursor, job_id, parameters)
|
||||
elif job_type == "sync_external":
|
||||
result = sync_external_system(cursor, job_id, parameters)
|
||||
else:
|
||||
result = run_default_job(cursor, job_id, parameters)
|
||||
|
||||
# Mark job as completed
|
||||
completed_at = datetime.datetime.utcnow()
|
||||
cursor.execute(
|
||||
"""
|
||||
UPDATE jobs
|
||||
SET status = 'completed',
|
||||
result = %s,
|
||||
completed_at = %s,
|
||||
duration = EXTRACT(EPOCH FROM (%s - started_at))
|
||||
WHERE id = %s
|
||||
""",
|
||||
(result, completed_at, completed_at, job_id)
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
return {
|
||||
"job_id": job_id,
|
||||
"status": "completed",
|
||||
"result": result,
|
||||
"duration_seconds": (completed_at - started_at).total_seconds()
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
# Mark job as failed
|
||||
if conn:
|
||||
try:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
"""
|
||||
UPDATE jobs
|
||||
SET status = 'failed',
|
||||
error = %s,
|
||||
completed_at = NOW()
|
||||
WHERE id = %s
|
||||
""",
|
||||
(str(e), job_id)
|
||||
)
|
||||
conn.commit()
|
||||
except:
|
||||
pass
|
||||
|
||||
raise DatabaseError(f"Job {job_type} failed: {str(e)}")
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
|
||||
def generate_report(cursor, job_id: str, parameters: dict):
|
||||
"""
|
||||
Generate a report based on parameters.
|
||||
|
||||
Args:
|
||||
cursor: Database cursor
|
||||
job_id: Job tracking ID
|
||||
parameters: Report configuration (report_type, date, filters, etc.)
|
||||
|
||||
Returns:
|
||||
Dictionary with report metadata and summary
|
||||
"""
|
||||
report_type = parameters.get("report_type", "daily")
|
||||
report_date = parameters.get("date", datetime.datetime.utcnow().strftime("%Y-%m-%d"))
|
||||
|
||||
# Simulate report generation (could be complex aggregation queries)
|
||||
time.sleep(1) # Simulate work
|
||||
|
||||
# Example: Get statistics for the date
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT
|
||||
COUNT(*) as total_orders,
|
||||
SUM(total) as revenue,
|
||||
COUNT(DISTINCT user_id) as unique_customers
|
||||
FROM orders
|
||||
WHERE DATE(created_at) = %s
|
||||
""",
|
||||
(report_date,)
|
||||
)
|
||||
stats = db_row_to_dict(cursor, cursor.fetchone())
|
||||
|
||||
return {
|
||||
"report_type": report_type,
|
||||
"date": report_date,
|
||||
"statistics": stats,
|
||||
"generated_at": datetime.datetime.utcnow().isoformat()
|
||||
}
|
||||
|
||||
|
||||
def cleanup_old_data(cursor, job_id: str, parameters: dict):
|
||||
"""
|
||||
Clean up old records based on retention policy.
|
||||
|
||||
Args:
|
||||
cursor: Database cursor
|
||||
job_id: Job tracking ID
|
||||
parameters: Cleanup configuration (table, days_to_retain, etc.)
|
||||
|
||||
Returns:
|
||||
Dictionary with cleanup summary
|
||||
"""
|
||||
table = parameters.get("table", "jobs") # Table to clean
|
||||
days_to_retain = int(parameters.get("days_to_retain", 90))
|
||||
cutoff_date = datetime.datetime.utcnow() - datetime.timedelta(days=days_to_retain)
|
||||
|
||||
# Safety: prevent dropping tables
|
||||
if table not in ["jobs", "webhook_events", "logs", "sessions"]:
|
||||
raise ValueError(f"Cannot clean table: {table}")
|
||||
|
||||
# Count records to be deleted
|
||||
cursor.execute(
|
||||
f"SELECT COUNT(*) FROM {table} WHERE created_at < %s",
|
||||
(cutoff_date,)
|
||||
)
|
||||
count = cursor.fetchone()[0]
|
||||
|
||||
# Delete old records
|
||||
cursor.execute(
|
||||
f"DELETE FROM {table} WHERE created_at < %s",
|
||||
(cutoff_date,)
|
||||
)
|
||||
|
||||
return {
|
||||
"table": table,
|
||||
"cutoff_date": cutoff_date.isoformat(),
|
||||
"records_deleted": count
|
||||
}
|
||||
|
||||
|
||||
def sync_external_system(cursor, job_id: str, parameters: dict):
|
||||
"""
|
||||
Synchronize data with external system.
|
||||
|
||||
Args:
|
||||
cursor: Database cursor
|
||||
job_id: Job tracking ID
|
||||
parameters: Sync configuration (system, endpoint, filters, etc.)
|
||||
|
||||
Returns:
|
||||
Dictionary with sync summary
|
||||
"""
|
||||
system = parameters.get("system")
|
||||
endpoint = parameters.get("endpoint")
|
||||
|
||||
# This would typically make HTTP requests to external API
|
||||
# using requests library
|
||||
import requests
|
||||
|
||||
# Fetch last sync timestamp
|
||||
cursor.execute(
|
||||
"SELECT last_sync_at FROM sync_state WHERE system = %s",
|
||||
(system,)
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
last_sync = row[0] if row else None
|
||||
|
||||
# Build query parameters
|
||||
params = {"since": last_sync.isoformat() if last_sync else ""}
|
||||
|
||||
# Make request to external API
|
||||
try:
|
||||
resp = requests.get(endpoint, params=params, timeout=30)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
except Exception as e:
|
||||
raise DatabaseError(f"Failed to fetch from {system}: {str(e)}")
|
||||
|
||||
# Process and store data
|
||||
records_processed = 0
|
||||
for item in data.get("items", []):
|
||||
cursor.execute(
|
||||
"""
|
||||
INSERT INTO external_data (system, external_id, data, synced_at)
|
||||
VALUES (%s, %s, %s, NOW())
|
||||
ON CONFLICT (system, external_id) DO UPDATE SET
|
||||
data = EXCLUDED.data,
|
||||
synced_at = EXCLUDED.synced_at
|
||||
""",
|
||||
(system, item["id"], item)
|
||||
)
|
||||
records_processed += 1
|
||||
|
||||
# Update sync state
|
||||
cursor.execute(
|
||||
"""
|
||||
INSERT INTO sync_state (system, last_sync_at)
|
||||
VALUES (%s, NOW())
|
||||
ON CONFLICT (system) DO UPDATE SET
|
||||
last_sync_at = NOW()
|
||||
""",
|
||||
(system,)
|
||||
)
|
||||
|
||||
return {
|
||||
"system": system,
|
||||
"records_processed": records_processed,
|
||||
"sync_timestamp": datetime.datetime.utcnow().isoformat()
|
||||
}
|
||||
|
||||
|
||||
def run_default_job(cursor, job_id: str, parameters: dict):
|
||||
"""
|
||||
Default no-op job for testing.
|
||||
|
||||
Args:
|
||||
cursor: Database cursor
|
||||
job_id: Job tracking ID
|
||||
parameters: Job parameters
|
||||
|
||||
Returns:
|
||||
Simple acknowledgment
|
||||
"""
|
||||
time.sleep(0.5) # Simulate some work
|
||||
return {
|
||||
"message": "Default job executed",
|
||||
"parameters_received": parameters
|
||||
}
|
||||
Reference in New Issue
Block a user