Source code for litestar_workflows.steps.webhook

"""Webhook wait steps for external event integration."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from litestar_workflows.core import StepType
from litestar_workflows.steps.base import BaseStep

if TYPE_CHECKING:
    from litestar_workflows.core import WorkflowContext


[docs] class WebhookStep(BaseStep): """Step that waits for external webhook callback. Webhook steps pause workflow execution until an external system sends a callback with data. This is useful for integrating with third-party services or async external processes. The execution engine is responsible for managing the actual waiting and resuming the workflow when the webhook is received. Example: >>> # Wait for payment confirmation >>> step = WebhookStep( ... "wait_payment", ... callback_key="payment_data", ... description="Wait for payment gateway callback", ... ) >>> # Workflow pauses here until webhook received >>> payment_data = await step.execute(context) >>> # Continue with payment_data from webhook """ step_type: StepType = StepType.WEBHOOK
[docs] def __init__( self, name: str, callback_key: str = "webhook_data", description: str = "", ) -> None: """Initialize a webhook step. Args: name: Unique identifier for the step. callback_key: Context key where webhook data will be stored. description: Human-readable description. """ super().__init__(name, description) self.step_type = StepType.WEBHOOK self.callback_key = callback_key
[docs] async def execute(self, context: WorkflowContext) -> Any: """Retrieve webhook data from context. This step pauses execution until a webhook is received. The execution engine handles the actual waiting mechanism. Args: context: The workflow execution context. Returns: The data received from the webhook callback. Note: The engine will populate the callback_key in context when the webhook is received before resuming execution. """ return context.get(self.callback_key)