Source code for litestar_workflows.core.protocols

"""Core protocols for litestar-workflows.

This module defines the Protocol-based interfaces that provide structural typing
for steps, workflows, and execution engines. Using Protocol allows duck typing
while maintaining type safety.
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Protocol, TypeVar, runtime_checkable
from uuid import UUID

if TYPE_CHECKING:  # pragma: no cover
    from datetime import timedelta

    from litestar_workflows.core.context import WorkflowContext
    from litestar_workflows.core.types import StepType, WorkflowStatus

    # Use forward reference to avoid circular imports
    WorkflowDefinition = Any  # Will be defined in definition module


__all__ = ["ExecutionEngine", "Step", "Workflow", "WorkflowInstance"]

T = TypeVar("T", covariant=True)


[docs] @runtime_checkable class Step(Protocol[T]): """Protocol defining the interface for workflow steps. Steps are the atomic units of work within a workflow. They can be machine-automated, human-interactive, or event-driven. Each step must implement the core execution interface along with lifecycle hooks. Attributes: name: Unique identifier for this step within its workflow. description: Human-readable description of the step's purpose. step_type: Classification of this step (MACHINE, HUMAN, WEBHOOK, etc.). Example: >>> class ApprovalStep: ... name = "approval" ... description = "Manager approval required" ... step_type = StepType.HUMAN ... ... async def execute(self, context: WorkflowContext) -> bool: ... return context.get("approved", False) ... ... async def can_execute(self, context: WorkflowContext) -> bool: ... return context.get("request_submitted", False) ... ... async def on_success(self, context: WorkflowContext, result: bool) -> None: ... context.set("approval_result", result) ... ... async def on_failure(self, context: WorkflowContext, error: Exception) -> None: ... context.set("approval_error", str(error)) """ name: str description: str step_type: StepType
[docs] async def execute(self, context: WorkflowContext) -> T: """Execute the step's primary logic. This method contains the core functionality of the step. It receives the workflow context and can read/write data, perform operations, and return a result that will be stored in the execution history. Args: context: The current workflow execution context. Returns: The result of the step execution, which will be stored in step history. Raises: Exception: Any exception raised will trigger the on_failure hook and potentially fail the workflow. """ ... # pragma: no cover
[docs] async def can_execute(self, context: WorkflowContext) -> bool: """Determine if the step is eligible to execute. This method acts as a guard or validator, checking preconditions before execution. If it returns False, the step will be marked as SKIPPED. Args: context: The current workflow execution context. Returns: True if the step should execute, False to skip it. Example: >>> async def can_execute(self, context: WorkflowContext) -> bool: ... return context.get("prerequisites_met", False) """ ... # pragma: no cover
[docs] async def on_success(self, context: WorkflowContext, result: T) -> None: """Hook called after successful step execution. This method is invoked after execute() completes without raising an exception. Use it to update context, trigger side effects, or perform cleanup. Args: context: The current workflow execution context. result: The return value from the execute() method. Example: >>> async def on_success(self, context: WorkflowContext, result: dict) -> None: ... context.set("last_result", result) ... await send_notification(f"Step {self.name} completed") """ ... # pragma: no cover
[docs] async def on_failure(self, context: WorkflowContext, error: Exception) -> None: """Hook called after failed step execution. This method is invoked when execute() raises an exception. Use it for error logging, compensation logic, or triggering alerts. Args: context: The current workflow execution context. error: The exception that was raised during execution. Example: >>> async def on_failure(self, context: WorkflowContext, error: Exception) -> None: ... context.set("error", str(error)) ... await log_error(f"Step {self.name} failed: {error}") """ ... # pragma: no cover
[docs] @runtime_checkable class Workflow(Protocol): """Protocol defining the interface for workflow definitions. Workflows orchestrate a collection of steps connected by edges (transitions). They define the structure and flow logic but not the runtime state. Attributes: name: Unique identifier for this workflow. version: Version string for workflow definition versioning. description: Human-readable description of the workflow's purpose. Example: >>> class DocumentApproval: ... name = "document_approval" ... version = "1.0.0" ... description = "Multi-level document approval workflow" ... ... def get_definition(self) -> WorkflowDefinition: ... return WorkflowDefinition( ... name=self.name, ... version=self.version, ... description=self.description, ... steps={...}, ... edges=[...], ... initial_step="submit", ... terminal_steps={"approved", "rejected"}, ... ) """ name: str version: str description: str
[docs] def get_definition(self) -> WorkflowDefinition: """Extract the workflow definition from the class. Returns: A WorkflowDefinition instance containing steps, edges, and metadata. """ ... # pragma: no cover
[docs] class WorkflowInstance(Protocol): """Protocol for workflow runtime instances. A WorkflowInstance represents a single execution of a workflow definition. It tracks runtime state, progress, and results. Attributes: id: Unique identifier for this workflow instance. workflow_id: Identifier of the workflow definition being executed. workflow_name: Name of the workflow definition. workflow_version: Version of the workflow definition. status: Current execution status. context: Runtime execution context. current_step: Name of the currently executing or next step. started_at: Timestamp when the instance was created. completed_at: Timestamp when the instance finished (if applicable). error: Error message if the workflow failed. """ id: UUID workflow_id: UUID workflow_name: str workflow_version: str status: WorkflowStatus context: WorkflowContext current_step: str | None started_at: Any # datetime completed_at: Any | None # datetime | None error: str | None
[docs] class ExecutionEngine(Protocol): """Protocol for workflow execution engines. The ExecutionEngine is responsible for orchestrating workflow execution, including starting workflows, executing steps, scheduling, and handling human tasks. Example: >>> engine = LocalExecutionEngine() >>> instance = await engine.start_workflow(MyWorkflow, initial_data={"input": "value"}) """
[docs] async def start_workflow( self, workflow: type[Workflow], initial_data: dict[str, Any] | None = None, ) -> WorkflowInstance: """Start a new workflow instance. Args: workflow: The workflow class to instantiate and execute. initial_data: Optional initial data to populate the workflow context. Returns: A WorkflowInstance representing the started workflow. Example: >>> instance = await engine.start_workflow( ... ApprovalWorkflow, initial_data={"document_id": "doc_123"} ... ) """ ... # pragma: no cover
[docs] async def execute_step( self, step: Step[Any], context: WorkflowContext, previous_result: Any = None, ) -> Any: """Execute a single step within a workflow. Args: step: The step to execute. context: The current workflow context. previous_result: Optional result from a previous step. Returns: The result of the step execution. Raises: Exception: If step execution fails and error handling doesn't compensate. """ ... # pragma: no cover
[docs] async def schedule_step( self, instance_id: UUID, step_name: str, delay: timedelta | None = None, ) -> None: """Schedule a step for later execution. Args: instance_id: The workflow instance identifier. step_name: Name of the step to schedule. delay: Optional delay before execution. If None, executes immediately. Example: >>> from datetime import timedelta >>> await engine.schedule_step( ... instance_id=instance.id, step_name="reminder", delay=timedelta(hours=24) ... ) """ ... # pragma: no cover
[docs] async def complete_human_task( self, instance_id: UUID, step_name: str, user_id: str, data: dict[str, Any], ) -> None: """Complete a human task with user-provided data. Args: instance_id: The workflow instance identifier. step_name: Name of the human step to complete. user_id: Identifier of the user completing the task. data: User-provided form data or input. Example: >>> await engine.complete_human_task( ... instance_id=instance.id, ... step_name="approval", ... user_id="user_123", ... data={"approved": True, "comments": "Looks good"}, ... ) """ ... # pragma: no cover
[docs] async def cancel_workflow(self, instance_id: UUID, reason: str) -> None: """Cancel a running workflow instance. Args: instance_id: The workflow instance identifier. reason: Explanation for the cancellation. Example: >>> await engine.cancel_workflow( ... instance_id=instance.id, reason="Request withdrawn by submitter" ... ) """ ... # pragma: no cover