""" Example: Webhook receiver pattern. This demonstrates: - Processing external service callbacks - Signature verification - Event type handling - Idempotency checks - Async processing patterns """ import hashlib import hmac from flask import request from helpers import init_db_connection, get_secret from exceptions import ValidationError, DatabaseError # For signed webhooks, you'll need a secret WEBHOOK_SECRET = get_secret("WEBHOOK_SECRET", "") def verify_signature(payload: bytes, signature: str) -> bool: """ Verify HMAC-SHA256 webhook signature. Args: payload: Raw request body bytes signature: Signature header value (format: "sha256=") Returns: True if signature is valid, False otherwise """ if not WEBHOOK_SECRET: return True # Skip verification if no secret configured (for dev) expected = hmac.new( WEBHOOK_SECRET.encode(), payload, hashlib.sha256 ).hexdigest() # Signature header format: "sha256=abcdef..." received = signature.split("=", 1)[1] if "=" in signature else signature return hmac.compare_digest(expected, received) def webhook_receiver(event, context): """ ```fission { "name": "webhook-receiver", "http_triggers": { "webhook": { "url": "/webhooks/external-service", "methods": ["POST"] } } } ``` Receive and process webhook from external service. **Request:** - Raw JSON payload in body - Signature header: `X-Webhook-Signature: sha256=` **Response:** - 200: Webhook accepted for processing - 400: Invalid signature or payload - 500: Processing error **Idempotency:** This function is idempotent - duplicate webhooks with same event ID will not be processed twice. """ # Get raw body for signature verification payload = request.get_data() signature = request.headers.get("X-Webhook-Signature", "") # Verify signature if not verify_signature(payload, signature): raise ValidationError("Invalid webhook signature") # Parse payload try: data = request.get_json() except Exception as e: raise ValidationError(f"Invalid JSON payload: {str(e)}") # Validate required fields event_id = data.get("event_id") or data.get("id") event_type = data.get("event_type") or data.get("type") if not event_id: raise ValidationError("Missing event_id in webhook payload") if not event_type: raise ValidationError("Missing event_type in webhook payload") # Idempotency check: have we already processed this event? conn = None try: conn = init_db_connection() cursor = conn.cursor() # Check if event already processed cursor.execute( "SELECT id FROM webhook_events WHERE event_id = %s", (event_id,) ) if cursor.fetchone(): # Already processed - return success (idempotent) return {"status": "already_processed", "event_id": event_id} # Record webhook event (for idempotency) cursor.execute( """ INSERT INTO webhook_events (event_id, event_type, payload, received_at) VALUES (%s, %s, %s, NOW()) """, (event_id, event_type, payload.decode('utf-8')) ) # Process based on event type result = process_event(cursor, event_type, data) conn.commit() return {"status": "processed", "event_id": event_id, "result": result} except Exception as e: if conn: conn.rollback() raise DatabaseError(f"Failed to process webhook: {str(e)}") finally: if conn: conn.close() def process_event(cursor, event_type: str, data: dict): """ Route event to appropriate handler. Args: cursor: Database cursor event_type: Type of event (e.g., "user.created", "order.updated") data: Event payload Returns: Handler result """ handlers = { "user.created": handle_user_created, "user.updated": handle_user_updated, "user.deleted": handle_user_deleted, "order.created": handle_order_created, "order.paid": handle_order_paid, "order.shipped": handle_order_shipped, } handler = handlers.get(event_type) if not handler: # Log unknown event type but don't fail logger = get_logger() logger.warning(f"Unhandled webhook event type: {event_type}") return {"skipped": True, "reason": "unknown_event_type"} return handler(cursor, data) def handle_user_created(cursor, data: dict): """Handle user creation event.""" user_id = data.get("user_id") or data.get("id") email = data.get("email") name = data.get("name") # Create user record cursor.execute( """ INSERT INTO users (id, email, name, created_at) VALUES (%s, %s, %s, NOW()) ON CONFLICT (id) DO UPDATE SET email = EXCLUDED.email, name = EXCLUDED.name, updated_at = NOW() """, (user_id, email, name) ) # Send welcome email (async via message queue, etc.) # enqueue_welcome_email(user_id, email) return {"action": "user_created", "user_id": user_id} def handle_user_updated(cursor, data: dict): """Handle user update event.""" user_id = data.get("user_id") or data.get("id") updates = data.get("updates", {}) # Build dynamic update set_clauses = [] params = [] for key, value in updates.items(): set_clauses.append(f"{key} = %s") params.append(value) params.append(user_id) cursor.execute( f"UPDATE users SET {', '.join(set_clauses)}, updated_at = NOW() WHERE id = %s", params ) return {"action": "user_updated", "user_id": user_id} def handle_user_deleted(cursor, data: dict): """Handle user deletion event.""" user_id = data.get("user_id") or data.get("id") # Soft delete (mark as inactive) cursor.execute( "UPDATE users SET status = 'deleted', deleted_at = NOW() WHERE id = %s", (user_id,) ) return {"action": "user_deleted", "user_id": user_id} def handle_order_created(cursor, data: dict): """Handle order creation event.""" order_id = data.get("order_id") or data.get("id") user_id = data.get("user_id") total = data.get("total") cursor.execute( """ INSERT INTO orders (id, user_id, total, status, created_at) VALUES (%s, %s, %s, 'pending', NOW()) """, (order_id, user_id, total) ) return {"action": "order_created", "order_id": order_id} def handle_order_paid(cursor, data: dict): """Handle order payment event.""" order_id = data.get("order_id") or data.get("id") payment_id = data.get("payment_id") amount = data.get("amount") cursor.execute( """ UPDATE orders SET status = 'paid', paid_amount = %s, payment_id = %s, paid_at = NOW() WHERE id = %s """, (amount, payment_id, order_id) ) # Trigger fulfillment # enqueue_fulfillment(order_id) return {"action": "order_paid", "order_id": order_id} def handle_order_shipped(cursor, data: dict): """Handle order shipment event.""" order_id = data.get("order_id") or data.get("id") tracking_number = data.get("tracking_number") carrier = data.get("carrier") cursor.execute( """ UPDATE orders SET status = 'shipped', tracking_number = %s, carrier = %s, shipped_at = NOW() WHERE id = %s """, (tracking_number, carrier, order_id) ) # Send shipping notification # send_shipping_email(order_id) return {"action": "order_shipped", "order_id": order_id} def get_logger(): """Get logger instance.""" import logging return logging.getLogger(__name__)