""" Example: Background job / scheduled task pattern. This demonstrates: - Long-running job execution - Job status tracking - Error handling and retries - Periodic task scheduling - Worker session management Use cases: report generation, batch processing, cleanup jobs, etc. """ import datetime import time import uuid from helpers import init_db_connection, db_row_to_dict, db_rows_to_array from exceptions import DatabaseError def scheduled_job(event, context): """ ```fission { "name": "scheduled-job", "http_triggers": { "run": { "url": "/jobs/run", "methods": ["POST"] } }, "kafka_triggers": { "job-queue": { "topic": "job-queue", "consumer_group": "scheduler-workers" } } } ``` Execute a scheduled or queued background job. This function can be triggered: - Manually via HTTP POST /jobs/run - Automatically by message queue (Kafka) - By cron schedule (via Fission timer trigger) **Request Body (HTTP trigger):** ```json { "job_type": "report_generation", "parameters": { "report_type": "daily", "date": "2025-03-18" } } ``` **Response:** - 200: Job completed successfully - 202: Job accepted for async processing - 400: Invalid request - 500: Job failed """ # Parse input job_type = event.get("job_type") or event.get("type", "default") parameters = event.get("parameters", {}) # Generate job ID for tracking job_id = str(uuid.uuid4()) started_at = datetime.datetime.utcnow() conn = None try: conn = init_db_connection() cursor = conn.cursor() # Record job start cursor.execute( """ INSERT INTO jobs (id, type, parameters, status, started_at) VALUES (%s, %s, %s, 'running', %s) """, (job_id, job_type, parameters, started_at) ) conn.commit() # Execute job based on type if job_type == "report_generation": result = generate_report(cursor, job_id, parameters) elif job_type == "data_cleanup": result = cleanup_old_data(cursor, job_id, parameters) elif job_type == "sync_external": result = sync_external_system(cursor, job_id, parameters) else: result = run_default_job(cursor, job_id, parameters) # Mark job as completed completed_at = datetime.datetime.utcnow() cursor.execute( """ UPDATE jobs SET status = 'completed', result = %s, completed_at = %s, duration = EXTRACT(EPOCH FROM (%s - started_at)) WHERE id = %s """, (result, completed_at, completed_at, job_id) ) conn.commit() return { "job_id": job_id, "status": "completed", "result": result, "duration_seconds": (completed_at - started_at).total_seconds() } except Exception as e: # Mark job as failed if conn: try: cursor = conn.cursor() cursor.execute( """ UPDATE jobs SET status = 'failed', error = %s, completed_at = NOW() WHERE id = %s """, (str(e), job_id) ) conn.commit() except: pass raise DatabaseError(f"Job {job_type} failed: {str(e)}") finally: if conn: conn.close() def generate_report(cursor, job_id: str, parameters: dict): """ Generate a report based on parameters. Args: cursor: Database cursor job_id: Job tracking ID parameters: Report configuration (report_type, date, filters, etc.) Returns: Dictionary with report metadata and summary """ report_type = parameters.get("report_type", "daily") report_date = parameters.get("date", datetime.datetime.utcnow().strftime("%Y-%m-%d")) # Simulate report generation (could be complex aggregation queries) time.sleep(1) # Simulate work # Example: Get statistics for the date cursor.execute( """ SELECT COUNT(*) as total_orders, SUM(total) as revenue, COUNT(DISTINCT user_id) as unique_customers FROM orders WHERE DATE(created_at) = %s """, (report_date,) ) stats = db_row_to_dict(cursor, cursor.fetchone()) return { "report_type": report_type, "date": report_date, "statistics": stats, "generated_at": datetime.datetime.utcnow().isoformat() } def cleanup_old_data(cursor, job_id: str, parameters: dict): """ Clean up old records based on retention policy. Args: cursor: Database cursor job_id: Job tracking ID parameters: Cleanup configuration (table, days_to_retain, etc.) Returns: Dictionary with cleanup summary """ table = parameters.get("table", "jobs") # Table to clean days_to_retain = int(parameters.get("days_to_retain", 90)) cutoff_date = datetime.datetime.utcnow() - datetime.timedelta(days=days_to_retain) # Safety: prevent dropping tables if table not in ["jobs", "webhook_events", "logs", "sessions"]: raise ValueError(f"Cannot clean table: {table}") # Count records to be deleted cursor.execute( f"SELECT COUNT(*) FROM {table} WHERE created_at < %s", (cutoff_date,) ) count = cursor.fetchone()[0] # Delete old records cursor.execute( f"DELETE FROM {table} WHERE created_at < %s", (cutoff_date,) ) return { "table": table, "cutoff_date": cutoff_date.isoformat(), "records_deleted": count } def sync_external_system(cursor, job_id: str, parameters: dict): """ Synchronize data with external system. Args: cursor: Database cursor job_id: Job tracking ID parameters: Sync configuration (system, endpoint, filters, etc.) Returns: Dictionary with sync summary """ system = parameters.get("system") endpoint = parameters.get("endpoint") # This would typically make HTTP requests to external API # using requests library import requests # Fetch last sync timestamp cursor.execute( "SELECT last_sync_at FROM sync_state WHERE system = %s", (system,) ) row = cursor.fetchone() last_sync = row[0] if row else None # Build query parameters params = {"since": last_sync.isoformat() if last_sync else ""} # Make request to external API try: resp = requests.get(endpoint, params=params, timeout=30) resp.raise_for_status() data = resp.json() except Exception as e: raise DatabaseError(f"Failed to fetch from {system}: {str(e)}") # Process and store data records_processed = 0 for item in data.get("items", []): cursor.execute( """ INSERT INTO external_data (system, external_id, data, synced_at) VALUES (%s, %s, %s, NOW()) ON CONFLICT (system, external_id) DO UPDATE SET data = EXCLUDED.data, synced_at = EXCLUDED.synced_at """, (system, item["id"], item) ) records_processed += 1 # Update sync state cursor.execute( """ INSERT INTO sync_state (system, last_sync_at) VALUES (%s, NOW()) ON CONFLICT (system) DO UPDATE SET last_sync_at = NOW() """, (system,) ) return { "system": system, "records_processed": records_processed, "sync_timestamp": datetime.datetime.utcnow().isoformat() } def run_default_job(cursor, job_id: str, parameters: dict): """ Default no-op job for testing. Args: cursor: Database cursor job_id: Job tracking ID parameters: Job parameters Returns: Simple acknowledgment """ time.sleep(0.5) # Simulate some work return { "message": "Default job executed", "parameters_received": parameters }