Module Reference¶
Auto-generated API documentation from source code.
Note
This reference is automatically generated from docstrings in the source code. For conceptual documentation, see Core Concepts.
litestar_workflows¶
Litestar Workflows - Workflow automation library for Litestar.
This package provides a comprehensive workflow orchestration framework for Litestar applications, supporting both automated and human-in-the-loop workflows.
- Key Features:
DAG-based workflow definitions with validation
Sequential, parallel, and conditional execution
Human task integration
Event-driven architecture
Flexible execution engines (local, distributed)
Type-safe workflow definitions
Example
Basic plugin usage:
from litestar import Litestar
from litestar_workflows import WorkflowPlugin
app = Litestar(plugins=[WorkflowPlugin()])
Defining a workflow:
from litestar_workflows import (
WorkflowDefinition,
BaseMachineStep,
Edge,
WorkflowContext,
)
class ProcessOrder(BaseMachineStep):
name = "process_order"
description = "Process the customer order"
async def execute(self, context: WorkflowContext) -> dict:
order_id = context.get("order_id")
# Process order logic here
return {"processed": True, "order_id": order_id}
workflow = WorkflowDefinition(
name="order_workflow",
version="1.0.0",
steps={"process": ProcessOrder()},
edges=[],
initial_step="process",
terminal_steps={"process"},
)
- class litestar_workflows.BaseHumanStep[source]
Bases:
BaseStepBase for human approval/interaction steps.
Human steps pause workflow execution and wait for user input. They support forms, assignments, and deadline tracking.
- Parameters:
- __init__(name, title, description='', form_schema=None, assignee_key=None)[source]
Initialize the human step.
- Parameters:
- async execute(context)[source]
Execute the human step.
For human steps, execution typically means waiting for user input. Override this if you need custom behavior.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
The form data submitted by the user.
- async get_assignee(context)[source]
Get the assignee for this task from context.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
User ID to assign the task to, or None for unassigned.
-
step_type:
StepType= 'human' Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).
-
title:
str Display title for the human task.
- class litestar_workflows.BaseMachineStep[source]
Bases:
BaseStepBase for automated machine steps.
Machine steps execute automatically without requiring human interaction. They are the building blocks for automated workflow processes.
- __init__(name, description='')[source]
Initialize the machine step.
-
step_type:
StepType= 'machine' Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).
- class litestar_workflows.ConditionalGroup[source]
Bases:
StepGroupExecute one of multiple branches based on condition.
This implements the Gateway pattern where a condition function determines which branch to execute. Similar to if/else or switch statements.
Example
>>> def check_status(ctx: WorkflowContext) -> str: ... return "approved" if ctx.get("approved") else "rejected" >>> group = ConditionalGroup( ... condition=check_status, ... branches={ ... "approved": approve_step, ... "rejected": reject_step, ... }, ... ) >>> result = await group.execute(context, engine)
- Parameters:
- __init__(condition, branches)[source]
Initialize a conditional group.
- async execute(context, engine)[source]
Execute the branch selected by the condition.
- Parameters:
context (
WorkflowContext) – The workflow execution context.engine (
ExecutionEngine) – The execution engine to delegate step execution.
- Return type:
- Returns:
The result of the selected branch, or None if no match.
- Raises:
Exception – Any exception from step execution.
- class litestar_workflows.Edge[source]
Bases:
objectDefines a transition between workflow steps.
An edge represents a directed connection from one step to another, optionally conditioned on a predicate function or expression.
- source
Name of the source step or the Step class itself.
- target
Name of the target step or the Step class itself.
- condition
Optional condition for edge traversal. Can be a callable that takes WorkflowContext and returns bool, or a string expression.
Example
>>> edge = Edge( ... source="submit", ... target="review", ... condition=lambda ctx: ctx.get("auto_approve") is False, ... ) >>> conditional_edge = Edge( ... source="review", target="approve", condition="context.get('approved') == True" ... )
- Parameters:
- __init__(source, target, condition=None)
-
condition:
str|Callable[[WorkflowContext],bool] |None= None
- evaluate_condition(context)[source]
Evaluate the edge condition against the workflow context.
- Parameters:
context (
WorkflowContext) – The current workflow execution context.- Return type:
- Returns:
True if the condition is met or if no condition exists, False otherwise.
Example
>>> edge = Edge(source="a", target="b", condition=lambda ctx: ctx.get("value") > 10) >>> context.set("value", 15) >>> edge.evaluate_condition(context) True
- get_source_name()[source]
Get the name of the source step.
- Return type:
- Returns:
The source step name as a string.
- get_target_name()[source]
Get the name of the target step.
- Return type:
- Returns:
The target step name as a string.
- class litestar_workflows.ExclusiveGateway[source]
Bases:
BaseStepXOR gateway - exactly one path based on condition.
This gateway evaluates a condition function and returns the name of the next step to execute. Only one path will be followed.
Example
>>> def check_approval(ctx: WorkflowContext) -> str: ... return "approved_step" if ctx.get("approved") else "rejected_step" >>> gateway = ExclusiveGateway("approval_gate", condition=check_approval) >>> next_step = await gateway.execute(context) # Returns step name
- Parameters:
name (
str)condition (
Callable[[WorkflowContext],str])description (
str)
- __init__(name, condition, description='')[source]
Initialize an exclusive gateway.
- Parameters:
name (
str) – Unique identifier for the gateway.condition (
Callable[[WorkflowContext],str]) – Function that evaluates context and returns next step name.description (
str) – Human-readable description.
- async execute(context)[source]
Evaluate condition and return the name of the next step.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
The name of the next step to execute.
- Raises:
Exception – If condition evaluation fails.
-
step_type:
StepType= 'gateway' Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).
- exception litestar_workflows.HumanTaskError[source]
Bases:
WorkflowsErrorBase exception for human task related errors.
All human task specific exceptions should inherit from this class. This allows distinguishing between automated workflow errors and human interaction errors.
- exception litestar_workflows.InvalidTransitionError[source]
Bases:
WorkflowsErrorRaised when an invalid state transition is attempted.
This occurs when trying to transition between steps in a way that violates the workflow’s defined graph structure or transition rules.
- from_step
The step being transitioned from.
- to_step
The step being transitioned to.
- class litestar_workflows.LocalExecutionEngine[source]
Bases:
objectIn-memory async execution engine for workflows.
This engine executes workflows in the same process using asyncio tasks. It’s suitable for development, testing, and single-instance production deployments where distributed execution is not required.
- registry
The workflow registry for looking up definitions.
- persistence
Optional persistence layer for saving state.
- event_bus
Optional event bus for emitting workflow events.
- _instances
In-memory storage of workflow instances.
- _running
Map of instance IDs to their running asyncio tasks.
- Parameters:
registry (
WorkflowRegistry)
- __init__(registry, persistence=None, event_bus=None)[source]
Initialize the local execution engine.
- Parameters:
registry (
WorkflowRegistry) – The workflow registry.persistence (
Any|None) – Optional persistence layer implementing save/load methods.event_bus (
Any|None) – Optional event bus implementing emit method.
- async cancel_workflow(instance_id, reason)[source]
Cancel a running workflow.
- async complete_human_task(instance_id, step_name, user_id, data)[source]
Complete a human task with user-provided data.
- async execute_step(step, context, previous_result=None)[source]
Execute a single step with the given context.
- Parameters:
context (
WorkflowContext) – The workflow context.previous_result (
Any) – Optional result from previous step.
- Return type:
- Returns:
The result of the step execution.
- get_all_instances()[source]
Get all workflow instances (running and completed).
- Return type:
list[WorkflowInstanceData]- Returns:
List of all WorkflowInstanceData objects.
- async get_instance(instance_id)[source]
Retrieve a workflow instance by ID.
- get_running_instances()[source]
Get all currently running workflow instances.
- Return type:
list[WorkflowInstanceData]- Returns:
List of running WorkflowInstanceData objects.
- async schedule_step(instance_id, step_name, delay=None)[source]
Schedule a step for execution.
- async start_workflow(workflow, initial_data=None)[source]
Start a new workflow instance.
Creates a new workflow instance and begins execution from the initial step.
- Parameters:
- Return type:
WorkflowInstanceData- Returns:
The created WorkflowInstanceData.
Example
>>> engine = LocalExecutionEngine(registry) >>> instance = await engine.start_workflow( ... ApprovalWorkflow, initial_data={"document_id": "doc_123"} ... )
- class litestar_workflows.ParallelGateway[source]
Bases:
BaseStepAND gateway - all paths execute in parallel.
This gateway splits execution into multiple parallel branches. All branches will be executed concurrently.
Example
>>> gateway = ParallelGateway( ... "fork_point", branches=["notify_team", "update_db", "send_email"] ... ) >>> branch_names = await gateway.execute(context)
- __init__(name, branches, description='')[source]
Initialize a parallel gateway.
- async execute(context)[source]
Return the list of branches to execute in parallel.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
List of step names to execute concurrently.
-
step_type:
StepType= 'gateway' Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).
- class litestar_workflows.ParallelGroup[source]
Bases:
StepGroupExecute steps in parallel.
This implements the Group pattern where multiple steps execute concurrently. Optionally supports a callback step (Chord pattern) that receives all results.
Example
>>> # Simple parallel execution >>> group = ParallelGroup(step1, step2, step3) >>> results = await group.execute(context, engine) # [result1, result2, result3]
>>> # Chord pattern with callback >>> group = ParallelGroup(step1, step2, step3, callback=aggregate_step) >>> result = await group.execute(context, engine) # aggregate_step([r1, r2, r3])
- __init__(*steps, callback=None)[source]
Initialize a parallel group.
- async execute(context, engine)[source]
Execute steps in parallel using asyncio.gather.
- Parameters:
context (
WorkflowContext) – The workflow execution context.engine (
ExecutionEngine) – The execution engine to delegate step execution.
- Return type:
- Returns:
List of results if no callback, otherwise callback result.
- Raises:
Exception – Any exception from step execution.
- class litestar_workflows.SequentialGroup[source]
Bases:
StepGroupExecute steps in sequence, passing results.
This implements the Chain pattern where each step receives the result of the previous step as input. The final step’s result is returned.
Example
>>> group = SequentialGroup(step1, step2, step3) >>> result = await group.execute(context, engine) # step1 -> step2(result1) -> step3(result2) -> result3
- __init__(*steps)[source]
Initialize a sequential group.
- async execute(context, engine)[source]
Execute steps sequentially, passing results forward.
- Parameters:
context (
WorkflowContext) – The workflow execution context.engine (
ExecutionEngine) – The execution engine to delegate step execution.
- Return type:
- Returns:
The result of the final step.
- Raises:
Exception – Any exception from step execution.
- class litestar_workflows.StepExecution[source]
Bases:
objectRecord of a single step execution within a workflow.
- step_name
Name of the executed step.
- status
Final status of the step execution.
- started_at
Timestamp when step execution began.
- completed_at
Timestamp when step execution finished (if completed).
- result
Return value from successful execution.
- error
Error message if execution failed.
- input_data
Input data passed to the step.
- output_data
Output data produced by the step.
- Parameters:
- __init__(step_name, status, started_at, completed_at=None, result=None, error=None, input_data=None, output_data=None)
-
result:
Any= None
-
step_name:
str
-
status:
str
-
started_at:
datetime
- exception litestar_workflows.StepExecutionError[source]
Bases:
WorkflowsErrorRaised when a step fails to execute.
This wraps the underlying exception that caused the step to fail, providing context about which step failed.
- step_name
The name of the step that failed.
- cause
The underlying exception that caused the failure, if any.
- class litestar_workflows.StepStatus[source]
Bases:
StrEnumExecution status of a workflow step.
- PENDING
Step has not yet been scheduled for execution.
- SCHEDULED
Step is queued and awaiting execution.
- RUNNING
Step is currently executing.
- WAITING
Step is paused, waiting for external input or event.
- SUCCEEDED
Step completed successfully.
- FAILED
Step execution encountered an error.
- CANCELED
Step was manually canceled before completion.
- SKIPPED
Step was skipped due to conditional logic.
- __new__(value)
- PENDING = 'pending'
- SCHEDULED = 'scheduled'
- RUNNING = 'running'
- WAITING = 'waiting'
- SUCCEEDED = 'succeeded'
- FAILED = 'failed'
- CANCELED = 'canceled'
- SKIPPED = 'skipped'
- class litestar_workflows.StepType[source]
Bases:
StrEnumClassification of step types within a workflow.
- MACHINE
Automated execution without human intervention.
- HUMAN
Requires user interaction and input.
- WEBHOOK
Waits for external callback or event.
- TIMER
Waits for a time-based condition to be met.
- GATEWAY
Decision or branching point in the workflow.
- __new__(value)
- MACHINE = 'machine'
- HUMAN = 'human'
- WEBHOOK = 'webhook'
- TIMER = 'timer'
- GATEWAY = 'gateway'
- exception litestar_workflows.TaskAlreadyCompletedError[source]
Bases:
HumanTaskErrorRaised when trying to complete an already completed task.
This prevents double-completion of human tasks which could lead to inconsistent workflow state.
- task_id
The ID of the task that was already completed.
- exception litestar_workflows.TaskNotFoundError[source]
Bases:
HumanTaskErrorRaised when a human task is not found.
This occurs when trying to retrieve or complete a task that doesn’t exist in the human task storage.
- task_id
The ID of the task that was not found.
- class litestar_workflows.TimerStep[source]
Bases:
BaseStepStep that waits for a duration before continuing.
Timer steps introduce delays in workflow execution. The duration can be static or dynamically calculated based on the workflow context.
Example
>>> # Static delay >>> step = TimerStep("wait_5min", duration=timedelta(minutes=5)) >>> await step.execute(context)
>>> # Dynamic delay based on context >>> def get_delay(ctx: WorkflowContext) -> timedelta: ... priority = ctx.get("priority", "normal") ... return timedelta(hours=1) if priority == "low" else timedelta(minutes=5) >>> step = TimerStep("dynamic_wait", duration=get_delay) >>> await step.execute(context)
- Parameters:
- __init__(name, duration, description='')[source]
Initialize a timer step.
- async execute(context)[source]
Wait for the specified duration.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
None after the delay completes.
- get_duration(context)[source]
Get the delay duration for this step.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
The duration to wait.
-
step_type:
StepType= 'timer' Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).
- exception litestar_workflows.UnauthorizedTaskError[source]
Bases:
HumanTaskErrorRaised when user is not authorized to complete a task.
This enforces authorization rules for human tasks, ensuring only assigned users or users with appropriate roles can complete tasks.
- task_id
The ID of the task.
- user_id
The ID of the user attempting to complete the task.
- exception litestar_workflows.WorkflowAlreadyCompletedError[source]
Bases:
WorkflowsErrorRaised when trying to modify a completed workflow.
This prevents operations on workflow instances that have reached a terminal state (completed, failed, or cancelled).
- instance_id
The ID of the workflow instance.
- status
The current terminal status of the workflow.
- class litestar_workflows.WorkflowContext[source]
Bases:
objectExecution context passed between workflow steps.
The WorkflowContext carries all state, metadata, and execution history throughout a workflow’s lifecycle. It provides a mutable data dictionary for step communication and immutable metadata for audit and tracking purposes.
- workflow_id
Unique identifier for the workflow definition.
- instance_id
Unique identifier for this workflow instance.
- data
Mutable state dictionary for inter-step communication.
- metadata
Immutable metadata about the workflow execution.
- current_step
Name of the currently executing step.
- step_history
Chronological record of all step executions.
- started_at
Timestamp when the workflow instance was created.
- user_id
Optional user identifier for human task contexts.
- tenant_id
Optional tenant identifier for multi-tenancy support.
Example
>>> from uuid import uuid4 >>> from datetime import datetime >>> context = WorkflowContext( ... workflow_id=uuid4(), ... instance_id=uuid4(), ... data={"input": "value"}, ... metadata={"creator": "user123"}, ... current_step="initial_step", ... step_history=[], ... started_at=datetime.utcnow(), ... ) >>> context.set("result", 42) >>> context.get("result") 42
- Parameters:
- __init__(workflow_id, instance_id, data, metadata, current_step, step_history, started_at, user_id=None, tenant_id=None)
- get(key, default=None)[source]
Retrieve a value from the workflow data dictionary.
- Parameters:
- Return type:
- Returns:
The value associated with the key, or the default if not present.
Example
>>> context.get("some_key", "default_value") 'default_value'
- get_last_execution(step_name=None)[source]
Get the most recent execution record for a step.
- Parameters:
step_name (
str|None) – Optional step name to filter by. If None, returns the last execution regardless of step.- Return type:
- Returns:
The most recent StepExecution matching the criteria, or None if not found.
Example
>>> last_exec = context.get_last_execution("approval_step") >>> if last_exec and last_exec.status == "SUCCEEDED": ... print("Step succeeded")
- has_step_executed(step_name)[source]
Check if a step has been executed in this workflow instance.
- Parameters:
step_name (
str) – Name of the step to check.- Return type:
- Returns:
True if the step appears in the execution history, False otherwise.
Example
>>> if context.has_step_executed("validation"): ... print("Validation already completed")
- set(key, value)[source]
Set a value in the workflow data dictionary.
- Parameters:
- Return type:
Example
>>> context.set("status", "approved") >>> context.get("status") 'approved'
- with_step(step_name)[source]
Create a new context for the given step execution.
This method returns a shallow copy of the context with the current_step updated to the provided step name. The data and metadata dictionaries are shared with the original context.
- Parameters:
step_name (
str) – Name of the step to execute next.- Return type:
- Returns:
A new WorkflowContext instance with updated current_step.
Example
>>> new_context = context.with_step("next_step") >>> new_context.current_step 'next_step'
-
workflow_id:
UUID
-
instance_id:
UUID
-
current_step:
str
-
step_history:
list[StepExecution]
-
started_at:
datetime
- class litestar_workflows.WorkflowDefinition[source]
Bases:
objectDeclarative workflow structure.
The WorkflowDefinition captures the complete structure of a workflow including all steps, edges, and metadata. It serves as the blueprint for workflow execution.
- name
Unique identifier for the workflow.
- version
Version string for workflow versioning.
- description
Human-readable description of the workflow’s purpose.
- steps
Dictionary mapping step names to Step instances.
- edges
List of Edge instances defining the workflow graph.
- initial_step
Name of the step to execute first.
- terminal_steps
Set of step names that mark workflow completion.
Example
>>> from litestar_workflows.core.types import StepType >>> definition = WorkflowDefinition( ... name="approval_flow", ... version="1.0.0", ... description="Document approval workflow", ... steps={ ... "submit": SubmitStep(), ... "review": ReviewStep(), ... "approve": ApproveStep(), ... }, ... edges=[ ... Edge(source="submit", target="review"), ... Edge(source="review", target="approve"), ... ], ... initial_step="submit", ... terminal_steps={"approve"}, ... )
- Parameters:
- __init__(name, version, description, steps, edges, initial_step, terminal_steps=<factory>)
- get_next_steps(current_step, context)[source]
Get the list of next steps from the current step based on edge conditions.
- Parameters:
current_step (
str) – Name of the current step.context (
WorkflowContext) – The workflow execution context for condition evaluation.
- Return type:
- Returns:
List of step names that should be executed next.
Example
>>> next_steps = definition.get_next_steps("review", context) >>> if "approve" in next_steps: ... print("Moving to approval")
- to_mermaid()[source]
Generate a MermaidJS graph representation of the workflow.
- Return type:
- Returns:
MermaidJS graph definition as a string.
Example
>>> mermaid = definition.to_mermaid() >>> print(mermaid) graph TD submit[Submit] review{Review} approve[Approve] submit --> review review --> approve
- to_mermaid_with_state(current_step=None, completed_steps=None, failed_steps=None)[source]
Generate a MermaidJS graph with execution state highlighting.
- Parameters:
- Return type:
- Returns:
MermaidJS graph definition with state styling.
Example
>>> mermaid = definition.to_mermaid_with_state( ... current_step="review", completed_steps=["submit"], failed_steps=[] ... )
- validate()[source]
Validate the workflow definition for common issues.
Example
>>> errors = definition.validate() >>> if errors: ... print("Validation errors:", errors)
-
name:
str
-
version:
str
-
description:
str
-
initial_step:
str
- exception litestar_workflows.WorkflowInstanceNotFoundError[source]
Bases:
WorkflowsErrorRaised when a workflow instance is not found.
This occurs when trying to retrieve or operate on a workflow instance that doesn’t exist in the workflow engine’s storage.
- instance_id
The ID of the workflow instance that was not found.
- exception litestar_workflows.WorkflowNotFoundError[source]
Bases:
WorkflowsErrorRaised when a workflow definition is not found.
This typically occurs when trying to retrieve or instantiate a workflow that hasn’t been registered with the workflow registry.
- name
The name of the workflow that was not found.
- version
The specific version requested, if any.
- class litestar_workflows.WorkflowPlugin[source]
Bases:
InitPluginProtocolLitestar plugin for workflow management.
This plugin integrates litestar-workflows with a Litestar application, providing dependency injection for the WorkflowRegistry and ExecutionEngine.
Example
Basic usage with auto-registration:
from litestar import Litestar from litestar_workflows import WorkflowPlugin, WorkflowPluginConfig class MyWorkflow: __workflow_name__ = "my_workflow" __workflow_version__ = "1.0.0" # ... workflow definition app = Litestar( plugins=[ WorkflowPlugin( config=WorkflowPluginConfig(auto_register_workflows=[MyWorkflow]) ) ] )
Using in a route handler:
from litestar import get from litestar_workflows import WorkflowRegistry, LocalExecutionEngine @get("/workflows") async def list_workflows( workflow_registry: WorkflowRegistry, ) -> list[dict]: definitions = workflow_registry.list_definitions() return [{"name": d.name, "version": d.version} for d in definitions] @post("/workflows/{name}/start") async def start_workflow( name: str, workflow_engine: LocalExecutionEngine, workflow_registry: WorkflowRegistry, ) -> dict: workflow_class = workflow_registry.get_workflow_class(name) instance = await workflow_engine.start_workflow(workflow_class) return {"instance_id": str(instance.id), "status": instance.status}
- Parameters:
config (
WorkflowPluginConfig|None)
- __init__(config=None)[source]
Initialize the plugin.
- Parameters:
config (
WorkflowPluginConfig|None) – Optional configuration for the plugin.
- property engine: LocalExecutionEngine
Get the execution engine.
- Returns:
The ExecutionEngine instance.
- Raises:
RuntimeError – If accessed before plugin initialization.
- on_app_init(app_config)[source]
Initialize the plugin when the Litestar app starts.
This method: 1. Creates or uses the provided WorkflowRegistry 2. Creates or uses the provided ExecutionEngine 3. Registers any auto_register_workflows 4. Adds dependency providers to the app config 5. Optionally registers REST API controllers if enable_api=True
- property registry: WorkflowRegistry
Get the workflow registry.
- Returns:
The WorkflowRegistry instance.
- Raises:
RuntimeError – If accessed before plugin initialization.
- class litestar_workflows.WorkflowPluginConfig[source]
Bases:
objectConfiguration for the WorkflowPlugin.
- registry
Optional pre-configured WorkflowRegistry. If not provided, a new one will be created.
- engine
Optional pre-configured ExecutionEngine. If not provided, a LocalExecutionEngine will be created using the registry.
- auto_register_workflows
List of workflow classes to automatically register with the registry on app startup.
- dependency_key_registry
The key used for dependency injection of the WorkflowRegistry. Defaults to “workflow_registry”.
- dependency_key_engine
The key used for dependency injection of the ExecutionEngine. Defaults to “workflow_engine”.
- enable_api
Whether to enable the REST API endpoints. Defaults to True.
- api_path_prefix
URL path prefix for all workflow API endpoints. Defaults to “/workflows”.
- api_guards
List of Litestar guards to apply to all workflow API endpoints.
- api_tags
OpenAPI tags to apply to workflow API endpoints.
- include_api_in_schema
Whether to include API endpoints in OpenAPI schema. Defaults to True.
- Parameters:
- __init__(registry=None, engine=None, auto_register_workflows=<factory>, dependency_key_registry='workflow_registry', dependency_key_engine='workflow_engine', enable_api=True, api_path_prefix='/workflows', api_guards=<factory>, api_tags=<factory>, include_api_in_schema=True)
-
api_path_prefix:
str= '/workflows'
-
dependency_key_engine:
str= 'workflow_engine'
-
dependency_key_registry:
str= 'workflow_registry'
-
enable_api:
bool= True
-
engine:
LocalExecutionEngine|None= None
-
include_api_in_schema:
bool= True
-
registry:
WorkflowRegistry|None= None
- class litestar_workflows.WorkflowRegistry[source]
Bases:
objectRegistry for storing and retrieving workflow definitions.
The registry maintains a mapping of workflow names to versions and their definitions, enabling workflow lookup and version management.
- _definitions
Nested dict mapping name -> version -> WorkflowDefinition.
- _workflow_classes
Map of workflow names to their class definitions.
- __init__()[source]
Initialize an empty workflow registry.
- get_definition(name, version=None)[source]
Retrieve a workflow definition by name and optional version.
- Parameters:
- Return type:
- Returns:
The WorkflowDefinition for the requested workflow.
- Raises:
KeyError – If the workflow name or version is not found.
Example
>>> definition = registry.get_definition("approval_workflow") >>> definition_v1 = registry.get_definition("approval_workflow", "1.0.0")
- get_versions(name)[source]
Get all versions for a workflow.
- Parameters:
name (
str) – The workflow name.- Return type:
- Returns:
List of version strings.
- Raises:
KeyError – If the workflow name is not found.
Example
>>> versions = registry.get_versions("approval_workflow") >>> print(versions) # ['1.0.0', '1.1.0', '2.0.0']
- get_workflow_class(name)[source]
Retrieve the workflow class by name.
- Parameters:
name (
str) – The workflow name.- Return type:
- Returns:
The Workflow class.
- Raises:
KeyError – If the workflow name is not found.
Example
>>> WorkflowClass = registry.get_workflow_class("approval_workflow") >>> instance = await engine.start_workflow(WorkflowClass)
- has_workflow(name, version=None)[source]
Check if a workflow exists in the registry.
- Parameters:
- Return type:
- Returns:
True if the workflow exists, False otherwise.
Example
>>> if registry.has_workflow("approval_workflow"): ... definition = registry.get_definition("approval_workflow")
- list_definitions(active_only=True)[source]
List all registered workflow definitions.
- Parameters:
active_only (
bool) – If True, only return the latest version of each workflow. If False, return all versions.- Return type:
- Returns:
List of WorkflowDefinition objects.
Example
>>> all_workflows = registry.list_definitions() >>> all_versions = registry.list_definitions(active_only=False)
- register(workflow_class)[source]
Register a workflow class with the registry.
Extracts the workflow definition from the class and stores it indexed by name and version.
Example
>>> registry = WorkflowRegistry() >>> registry.register(MyWorkflow)
- unregister(name, version=None)[source]
Remove a workflow from the registry.
- Parameters:
- Return type:
Example
>>> registry.unregister("old_workflow") >>> registry.unregister("approval_workflow", "1.0.0")
- class litestar_workflows.WorkflowStatus[source]
Bases:
StrEnumOverall status of a workflow instance.
- PENDING
Workflow has been created but not yet started.
- RUNNING
Workflow is actively executing steps.
- PAUSED
Workflow execution is temporarily paused.
- WAITING
Workflow is waiting for human input or external event.
- COMPLETED
Workflow finished successfully.
- FAILED
Workflow terminated due to an error.
- CANCELED
Workflow was manually canceled.
- __new__(value)
- PENDING = 'pending'
- RUNNING = 'running'
- PAUSED = 'paused'
- WAITING = 'waiting'
- COMPLETED = 'completed'
- FAILED = 'failed'
- CANCELED = 'canceled'
- exception litestar_workflows.WorkflowValidationError[source]
Bases:
WorkflowsErrorRaised when workflow definition validation fails.
This occurs during workflow registration or modification when the workflow definition doesn’t meet required constraints (e.g., cycles, unreachable nodes, invalid step types).
- errors
List of validation error messages.
Core Module¶
The core module provides fundamental building blocks for workflow automation.
Core domain module for litestar-workflows.
This module exports the fundamental building blocks for workflow definitions, including types, protocols, context, definitions, and events.
- class litestar_workflows.core.Edge[source]¶
Bases:
objectDefines a transition between workflow steps.
An edge represents a directed connection from one step to another, optionally conditioned on a predicate function or expression.
- source¶
Name of the source step or the Step class itself.
- target¶
Name of the target step or the Step class itself.
- condition¶
Optional condition for edge traversal. Can be a callable that takes WorkflowContext and returns bool, or a string expression.
Example
>>> edge = Edge( ... source="submit", ... target="review", ... condition=lambda ctx: ctx.get("auto_approve") is False, ... ) >>> conditional_edge = Edge( ... source="review", target="approve", condition="context.get('approved') == True" ... )
- Parameters:
- __init__(source, target, condition=None)¶
- evaluate_condition(context)[source]¶
Evaluate the edge condition against the workflow context.
- Parameters:
context (
WorkflowContext) – The current workflow execution context.- Return type:
- Returns:
True if the condition is met or if no condition exists, False otherwise.
Example
>>> edge = Edge(source="a", target="b", condition=lambda ctx: ctx.get("value") > 10) >>> context.set("value", 15) >>> edge.evaluate_condition(context) True
- get_source_name()[source]¶
Get the name of the source step.
- Return type:
- Returns:
The source step name as a string.
- class litestar_workflows.core.ExecutionEngine[source]¶
Bases:
ProtocolProtocol for workflow execution engines.
The ExecutionEngine is responsible for orchestrating workflow execution, including starting workflows, executing steps, scheduling, and handling human tasks.
Example
>>> engine = LocalExecutionEngine() >>> instance = await engine.start_workflow(MyWorkflow, initial_data={"input": "value"})
- __init__(*args, **kwargs)¶
- async cancel_workflow(instance_id, reason)[source]¶
Cancel a running workflow instance.
- Parameters:
- Return type:
Example
>>> await engine.cancel_workflow( ... instance_id=instance.id, reason="Request withdrawn by submitter" ... )
- async complete_human_task(instance_id, step_name, user_id, data)[source]¶
Complete a human task with user-provided data.
- Parameters:
- Return type:
Example
>>> await engine.complete_human_task( ... instance_id=instance.id, ... step_name="approval", ... user_id="user_123", ... data={"approved": True, "comments": "Looks good"}, ... )
- async execute_step(step, context, previous_result=None)[source]¶
Execute a single step within a workflow.
- Parameters:
context (
WorkflowContext) – The current workflow context.previous_result (
Any) – Optional result from a previous step.
- Return type:
- Returns:
The result of the step execution.
- Raises:
Exception – If step execution fails and error handling doesn’t compensate.
- async schedule_step(instance_id, step_name, delay=None)[source]¶
Schedule a step for later execution.
- Parameters:
- Return type:
Example
>>> from datetime import timedelta >>> await engine.schedule_step( ... instance_id=instance.id, step_name="reminder", delay=timedelta(hours=24) ... )
- class litestar_workflows.core.HumanTaskCompleted[source]¶
Bases:
WorkflowEventEvent emitted when a human task is completed.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the task was completed.
- step_name¶
Name of the human step.
- task_id¶
Unique identifier for this task.
- completed_by¶
User who completed the task.
- form_data¶
Data submitted by the user.
- comment¶
Optional comment provided by the user.
Example
>>> event = HumanTaskCompleted( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="manager_approval", ... task_id=uuid4(), ... completed_by="manager_123", ... form_data={"approved": True, "amount": 1500.00}, ... comment="Approved for payment", ... )
- Parameters:
- __init__(instance_id, timestamp, step_name, task_id, completed_by, form_data=None, comment=None)¶
- class litestar_workflows.core.HumanTaskCreated[source]¶
Bases:
WorkflowEventEvent emitted when a human task is created.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the task was created.
- step_name¶
Name of the human step.
- task_id¶
Unique identifier for this task.
- assignee¶
User or group assigned to the task.
- title¶
Display title for the task.
- description¶
Detailed description of what needs to be done.
- due_at¶
Optional deadline for task completion.
Example
>>> event = HumanTaskCreated( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="manager_approval", ... task_id=uuid4(), ... assignee="manager_group", ... title="Approve expense report", ... description="Review and approve expense report for project X", ... )
- Parameters:
- __init__(instance_id, timestamp, step_name, task_id, assignee=None, title=None, description=None, due_at=None)¶
- class litestar_workflows.core.HumanTaskReassigned[source]¶
Bases:
WorkflowEventEvent emitted when a human task is reassigned.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the task was reassigned.
- step_name¶
Name of the human step.
- task_id¶
Unique identifier for this task.
- from_assignee¶
Previous assignee.
- to_assignee¶
New assignee.
- reassigned_by¶
User who performed the reassignment.
- reason¶
Explanation for the reassignment.
Example
>>> event = HumanTaskReassigned( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="review", ... task_id=uuid4(), ... from_assignee="user_123", ... to_assignee="user_456", ... reassigned_by="admin_789", ... reason="Original assignee on vacation", ... )
- Parameters:
- __init__(instance_id, timestamp, step_name, task_id, from_assignee=None, to_assignee=None, reassigned_by=None, reason=None)¶
- class litestar_workflows.core.Step[source]¶
Bases:
Protocol[T]Protocol defining the interface for workflow steps.
Steps are the atomic units of work within a workflow. They can be machine-automated, human-interactive, or event-driven. Each step must implement the core execution interface along with lifecycle hooks.
- name¶
Unique identifier for this step within its workflow.
- description¶
Human-readable description of the step’s purpose.
- step_type¶
Classification of this step (MACHINE, HUMAN, WEBHOOK, etc.).
Example
>>> class ApprovalStep: ... name = "approval" ... description = "Manager approval required" ... step_type = StepType.HUMAN ... ... async def execute(self, context: WorkflowContext) -> bool: ... return context.get("approved", False) ... ... async def can_execute(self, context: WorkflowContext) -> bool: ... return context.get("request_submitted", False) ... ... async def on_success(self, context: WorkflowContext, result: bool) -> None: ... context.set("approval_result", result) ... ... async def on_failure(self, context: WorkflowContext, error: Exception) -> None: ... context.set("approval_error", str(error))
- __init__(*args, **kwargs)¶
- async can_execute(context)[source]¶
Determine if the step is eligible to execute.
This method acts as a guard or validator, checking preconditions before execution. If it returns False, the step will be marked as SKIPPED.
- Parameters:
context (
WorkflowContext) – The current workflow execution context.- Return type:
- Returns:
True if the step should execute, False to skip it.
Example
>>> async def can_execute(self, context: WorkflowContext) -> bool: ... return context.get("prerequisites_met", False)
- async execute(context)[source]¶
Execute the step’s primary logic.
This method contains the core functionality of the step. It receives the workflow context and can read/write data, perform operations, and return a result that will be stored in the execution history.
- Parameters:
context (
WorkflowContext) – The current workflow execution context.- Return type:
TypeVar(T, covariant=True)- Returns:
The result of the step execution, which will be stored in step history.
- Raises:
Exception – Any exception raised will trigger the on_failure hook and potentially fail the workflow.
- async on_failure(context, error)[source]¶
Hook called after failed step execution.
This method is invoked when execute() raises an exception. Use it for error logging, compensation logic, or triggering alerts.
- Parameters:
context (
WorkflowContext) – The current workflow execution context.error (
Exception) – The exception that was raised during execution.
- Return type:
Example
>>> async def on_failure(self, context: WorkflowContext, error: Exception) -> None: ... context.set("error", str(error)) ... await log_error(f"Step {self.name} failed: {error}")
- async on_success(context, result)[source]¶
Hook called after successful step execution.
This method is invoked after execute() completes without raising an exception. Use it to update context, trigger side effects, or perform cleanup.
- Parameters:
context (
WorkflowContext) – The current workflow execution context.result (
TypeVar(T, covariant=True)) – The return value from the execute() method.
- Return type:
Example
>>> async def on_success(self, context: WorkflowContext, result: dict) -> None: ... context.set("last_result", result) ... await send_notification(f"Step {self.name} completed")
- class litestar_workflows.core.StepCompleted[source]¶
Bases:
WorkflowEventEvent emitted when a step completes successfully.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the step completed.
- step_name¶
Name of the step that completed.
- status¶
Final status of the step execution.
- result¶
Return value from the step execution.
- output_data¶
Output data produced by the step.
- duration_seconds¶
Execution time in seconds.
Example
>>> event = StepCompleted( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="validation", ... status="SUCCEEDED", ... result={"valid": True}, ... duration_seconds=1.5, ... )
- Parameters:
- __init__(instance_id, timestamp, step_name, status, result=None, output_data=None, duration_seconds=None)¶
- class litestar_workflows.core.StepExecution[source]¶
Bases:
objectRecord of a single step execution within a workflow.
- step_name¶
Name of the executed step.
- status¶
Final status of the step execution.
- started_at¶
Timestamp when step execution began.
- completed_at¶
Timestamp when step execution finished (if completed).
- result¶
Return value from successful execution.
- error¶
Error message if execution failed.
- input_data¶
Input data passed to the step.
- output_data¶
Output data produced by the step.
- Parameters:
- __init__(step_name, status, started_at, completed_at=None, result=None, error=None, input_data=None, output_data=None)¶
- class litestar_workflows.core.StepFailed[source]¶
Bases:
WorkflowEventEvent emitted when a step fails execution.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the step failed.
- step_name¶
Name of the step that failed.
- error¶
Error message describing the failure.
- error_type¶
Type/class of the error.
- retry_count¶
Number of retry attempts made.
Example
>>> event = StepFailed( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="api_call", ... error="Connection timeout", ... error_type="TimeoutError", ... retry_count=3, ... )
- Parameters:
- __init__(instance_id, timestamp, step_name, error, error_type=None, retry_count=0)¶
- class litestar_workflows.core.StepSkipped[source]¶
Bases:
WorkflowEventEvent emitted when a step is skipped due to conditions.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the step was skipped.
- step_name¶
Name of the step that was skipped.
- reason¶
Explanation for why the step was skipped.
Example
>>> event = StepSkipped( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="optional_notification", ... reason="Notification disabled in settings", ... )
- __init__(instance_id, timestamp, step_name, reason=None)¶
- class litestar_workflows.core.StepStarted[source]¶
Bases:
WorkflowEventEvent emitted when a step begins execution.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the step started.
- step_name¶
Name of the step that started.
- step_type¶
Type of the step (MACHINE, HUMAN, etc.).
- input_data¶
Input data provided to the step.
Example
>>> event = StepStarted( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="validation", ... step_type="MACHINE", ... input_data={"data": "to_validate"}, ... )
- Parameters:
- __init__(instance_id, timestamp, step_name, step_type, input_data=None)¶
- class litestar_workflows.core.StepStatus[source]¶
Bases:
StrEnumExecution status of a workflow step.
- PENDING¶
Step has not yet been scheduled for execution.
- SCHEDULED¶
Step is queued and awaiting execution.
- RUNNING¶
Step is currently executing.
- WAITING¶
Step is paused, waiting for external input or event.
- SUCCEEDED¶
Step completed successfully.
- FAILED¶
Step execution encountered an error.
- CANCELED¶
Step was manually canceled before completion.
- SKIPPED¶
Step was skipped due to conditional logic.
- __new__(value)¶
- PENDING = 'pending'¶
- SCHEDULED = 'scheduled'¶
- RUNNING = 'running'¶
- WAITING = 'waiting'¶
- SUCCEEDED = 'succeeded'¶
- FAILED = 'failed'¶
- CANCELED = 'canceled'¶
- SKIPPED = 'skipped'¶
- class litestar_workflows.core.StepType[source]¶
Bases:
StrEnumClassification of step types within a workflow.
- MACHINE¶
Automated execution without human intervention.
- HUMAN¶
Requires user interaction and input.
- WEBHOOK¶
Waits for external callback or event.
- TIMER¶
Waits for a time-based condition to be met.
- GATEWAY¶
Decision or branching point in the workflow.
- __new__(value)¶
- MACHINE = 'machine'¶
- HUMAN = 'human'¶
- WEBHOOK = 'webhook'¶
- TIMER = 'timer'¶
- GATEWAY = 'gateway'¶
- class litestar_workflows.core.Workflow[source]¶
Bases:
ProtocolProtocol defining the interface for workflow definitions.
Workflows orchestrate a collection of steps connected by edges (transitions). They define the structure and flow logic but not the runtime state.
- name¶
Unique identifier for this workflow.
- version¶
Version string for workflow definition versioning.
- description¶
Human-readable description of the workflow’s purpose.
Example
>>> class DocumentApproval: ... name = "document_approval" ... version = "1.0.0" ... description = "Multi-level document approval workflow" ... ... def get_definition(self) -> WorkflowDefinition: ... return WorkflowDefinition( ... name=self.name, ... version=self.version, ... description=self.description, ... steps={...}, ... edges=[...], ... initial_step="submit", ... terminal_steps={"approved", "rejected"}, ... )
- __init__(*args, **kwargs)¶
- class litestar_workflows.core.WorkflowCanceled[source]¶
Bases:
WorkflowEventEvent emitted when a workflow instance is manually canceled.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the workflow was canceled.
- reason¶
Explanation for the cancellation.
- canceled_by¶
User who canceled the workflow.
- current_step¶
Step that was executing when canceled.
Example
>>> event = WorkflowCanceled( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... reason="Request withdrawn", ... canceled_by="user_123", ... current_step="review", ... )
- Parameters:
- __init__(instance_id, timestamp, reason, canceled_by=None, current_step=None)¶
- class litestar_workflows.core.WorkflowCompleted[source]¶
Bases:
WorkflowEventEvent emitted when a workflow instance completes successfully.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the workflow completed.
- status¶
Final status of the workflow.
- final_step¶
Name of the final step that was executed.
- duration_seconds¶
Total execution time in seconds.
Example
>>> event = WorkflowCompleted( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... status="COMPLETED", ... final_step="approve", ... duration_seconds=3600.5, ... )
- Parameters:
- __init__(instance_id, timestamp, status, final_step=None, duration_seconds=None)¶
- class litestar_workflows.core.WorkflowContext[source]¶
Bases:
objectExecution context passed between workflow steps.
The WorkflowContext carries all state, metadata, and execution history throughout a workflow’s lifecycle. It provides a mutable data dictionary for step communication and immutable metadata for audit and tracking purposes.
- workflow_id¶
Unique identifier for the workflow definition.
- instance_id¶
Unique identifier for this workflow instance.
- data¶
Mutable state dictionary for inter-step communication.
- metadata¶
Immutable metadata about the workflow execution.
- current_step¶
Name of the currently executing step.
- step_history¶
Chronological record of all step executions.
- started_at¶
Timestamp when the workflow instance was created.
- user_id¶
Optional user identifier for human task contexts.
- tenant_id¶
Optional tenant identifier for multi-tenancy support.
Example
>>> from uuid import uuid4 >>> from datetime import datetime >>> context = WorkflowContext( ... workflow_id=uuid4(), ... instance_id=uuid4(), ... data={"input": "value"}, ... metadata={"creator": "user123"}, ... current_step="initial_step", ... step_history=[], ... started_at=datetime.utcnow(), ... ) >>> context.set("result", 42) >>> context.get("result") 42
- Parameters:
- __init__(workflow_id, instance_id, data, metadata, current_step, step_history, started_at, user_id=None, tenant_id=None)¶
- get(key, default=None)[source]¶
Retrieve a value from the workflow data dictionary.
- Parameters:
- Return type:
- Returns:
The value associated with the key, or the default if not present.
Example
>>> context.get("some_key", "default_value") 'default_value'
- get_last_execution(step_name=None)[source]¶
Get the most recent execution record for a step.
- Parameters:
step_name (
str|None) – Optional step name to filter by. If None, returns the last execution regardless of step.- Return type:
- Returns:
The most recent StepExecution matching the criteria, or None if not found.
Example
>>> last_exec = context.get_last_execution("approval_step") >>> if last_exec and last_exec.status == "SUCCEEDED": ... print("Step succeeded")
- has_step_executed(step_name)[source]¶
Check if a step has been executed in this workflow instance.
- Parameters:
step_name (
str) – Name of the step to check.- Return type:
- Returns:
True if the step appears in the execution history, False otherwise.
Example
>>> if context.has_step_executed("validation"): ... print("Validation already completed")
- set(key, value)[source]¶
Set a value in the workflow data dictionary.
- Parameters:
- Return type:
Example
>>> context.set("status", "approved") >>> context.get("status") 'approved'
- with_step(step_name)[source]¶
Create a new context for the given step execution.
This method returns a shallow copy of the context with the current_step updated to the provided step name. The data and metadata dictionaries are shared with the original context.
- Parameters:
step_name (
str) – Name of the step to execute next.- Return type:
- Returns:
A new WorkflowContext instance with updated current_step.
Example
>>> new_context = context.with_step("next_step") >>> new_context.current_step 'next_step'
-
step_history:
list[StepExecution]¶
- class litestar_workflows.core.WorkflowDefinition[source]¶
Bases:
objectDeclarative workflow structure.
The WorkflowDefinition captures the complete structure of a workflow including all steps, edges, and metadata. It serves as the blueprint for workflow execution.
- name¶
Unique identifier for the workflow.
- version¶
Version string for workflow versioning.
- description¶
Human-readable description of the workflow’s purpose.
- steps¶
Dictionary mapping step names to Step instances.
- edges¶
List of Edge instances defining the workflow graph.
- initial_step¶
Name of the step to execute first.
- terminal_steps¶
Set of step names that mark workflow completion.
Example
>>> from litestar_workflows.core.types import StepType >>> definition = WorkflowDefinition( ... name="approval_flow", ... version="1.0.0", ... description="Document approval workflow", ... steps={ ... "submit": SubmitStep(), ... "review": ReviewStep(), ... "approve": ApproveStep(), ... }, ... edges=[ ... Edge(source="submit", target="review"), ... Edge(source="review", target="approve"), ... ], ... initial_step="submit", ... terminal_steps={"approve"}, ... )
- Parameters:
- __init__(name, version, description, steps, edges, initial_step, terminal_steps=<factory>)¶
- get_next_steps(current_step, context)[source]¶
Get the list of next steps from the current step based on edge conditions.
- Parameters:
current_step (
str) – Name of the current step.context (
WorkflowContext) – The workflow execution context for condition evaluation.
- Return type:
- Returns:
List of step names that should be executed next.
Example
>>> next_steps = definition.get_next_steps("review", context) >>> if "approve" in next_steps: ... print("Moving to approval")
- to_mermaid()[source]¶
Generate a MermaidJS graph representation of the workflow.
- Return type:
- Returns:
MermaidJS graph definition as a string.
Example
>>> mermaid = definition.to_mermaid() >>> print(mermaid) graph TD submit[Submit] review{Review} approve[Approve] submit --> review review --> approve
- to_mermaid_with_state(current_step=None, completed_steps=None, failed_steps=None)[source]¶
Generate a MermaidJS graph with execution state highlighting.
- Parameters:
- Return type:
- Returns:
MermaidJS graph definition with state styling.
Example
>>> mermaid = definition.to_mermaid_with_state( ... current_step="review", completed_steps=["submit"], failed_steps=[] ... )
- class litestar_workflows.core.WorkflowEvent[source]¶
Bases:
objectBase class for all workflow events.
All workflow events include the instance_id and timestamp for tracking and correlation.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the event occurred.
- class litestar_workflows.core.WorkflowFailed[source]¶
Bases:
WorkflowEventEvent emitted when a workflow instance fails.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the workflow failed.
- error¶
Error message describing the failure.
- failed_step¶
Name of the step that caused the failure.
- error_type¶
Type/class of the error that occurred.
- stack_trace¶
Optional stack trace for debugging.
Example
>>> event = WorkflowFailed( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... error="Database connection failed", ... failed_step="data_validation", ... error_type="ConnectionError", ... )
- Parameters:
- __init__(instance_id, timestamp, error, failed_step=None, error_type=None, stack_trace=None)¶
- class litestar_workflows.core.WorkflowInstance[source]¶
Bases:
ProtocolProtocol for workflow runtime instances.
A WorkflowInstance represents a single execution of a workflow definition. It tracks runtime state, progress, and results.
- id¶
Unique identifier for this workflow instance.
- workflow_id¶
Identifier of the workflow definition being executed.
- workflow_name¶
Name of the workflow definition.
- workflow_version¶
Version of the workflow definition.
- status¶
Current execution status.
- context¶
Runtime execution context.
- current_step¶
Name of the currently executing or next step.
- started_at¶
Timestamp when the instance was created.
- completed_at¶
Timestamp when the instance finished (if applicable).
- error¶
Error message if the workflow failed.
- __init__(*args, **kwargs)¶
-
status:
WorkflowStatus¶
-
context:
WorkflowContext¶
- class litestar_workflows.core.WorkflowPaused[source]¶
Bases:
WorkflowEventEvent emitted when a workflow instance is paused.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the workflow was paused.
- reason¶
Explanation for pausing.
- paused_at_step¶
Step where execution was paused.
Example
>>> event = WorkflowPaused( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... reason="Awaiting external data", ... paused_at_step="data_ingestion", ... )
- __init__(instance_id, timestamp, reason=None, paused_at_step=None)¶
- class litestar_workflows.core.WorkflowResumed[source]¶
Bases:
WorkflowEventEvent emitted when a paused workflow instance resumes.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the workflow resumed.
- resumed_by¶
User who resumed the workflow.
- resuming_at_step¶
Step where execution will resume.
Example
>>> event = WorkflowResumed( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... resumed_by="user_456", ... resuming_at_step="data_processing", ... )
- Parameters:
- __init__(instance_id, timestamp, resumed_by=None, resuming_at_step=None)¶
- class litestar_workflows.core.WorkflowStarted[source]¶
Bases:
WorkflowEventEvent emitted when a workflow instance starts execution.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the workflow started.
- workflow_name¶
Name of the workflow definition.
- workflow_version¶
Version of the workflow definition.
- initial_data¶
Initial data provided when starting the workflow.
- user_id¶
Optional user who initiated the workflow.
- tenant_id¶
Optional tenant identifier for multi-tenancy.
Example
>>> event = WorkflowStarted( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... workflow_name="approval_flow", ... workflow_version="1.0.0", ... initial_data={"document_id": "doc_123"}, ... )
- Parameters:
- __init__(instance_id, timestamp, workflow_name, workflow_version, initial_data=None, user_id=None, tenant_id=None)¶
- class litestar_workflows.core.WorkflowStatus[source]¶
Bases:
StrEnumOverall status of a workflow instance.
- PENDING¶
Workflow has been created but not yet started.
- RUNNING¶
Workflow is actively executing steps.
- PAUSED¶
Workflow execution is temporarily paused.
- WAITING¶
Workflow is waiting for human input or external event.
- COMPLETED¶
Workflow finished successfully.
- FAILED¶
Workflow terminated due to an error.
- CANCELED¶
Workflow was manually canceled.
- __new__(value)¶
- PENDING = 'pending'¶
- RUNNING = 'running'¶
- PAUSED = 'paused'¶
- WAITING = 'waiting'¶
- COMPLETED = 'completed'¶
- FAILED = 'failed'¶
- CANCELED = 'canceled'¶
Core type definitions for litestar-workflows.
This module defines the fundamental types, enums, and type aliases used throughout the workflow system.
- litestar_workflows.core.types.Context¶
Type alias for workflow context data dictionary.
- class litestar_workflows.core.types.StepExecution[source]¶
Bases:
objectRecord of a single step execution within a workflow.
- step_name¶
Name of the executed step.
- status¶
Final status of the step execution.
- started_at¶
Timestamp when step execution began.
- completed_at¶
Timestamp when step execution finished (if completed).
- result¶
Return value from successful execution.
- error¶
Error message if execution failed.
- input_data¶
Input data passed to the step.
- output_data¶
Output data produced by the step.
- Parameters:
- __init__(step_name, status, started_at, completed_at=None, result=None, error=None, input_data=None, output_data=None)¶
- class litestar_workflows.core.types.StepStatus[source]¶
Bases:
StrEnumExecution status of a workflow step.
- PENDING¶
Step has not yet been scheduled for execution.
- SCHEDULED¶
Step is queued and awaiting execution.
- RUNNING¶
Step is currently executing.
- WAITING¶
Step is paused, waiting for external input or event.
- SUCCEEDED¶
Step completed successfully.
- FAILED¶
Step execution encountered an error.
- CANCELED¶
Step was manually canceled before completion.
- SKIPPED¶
Step was skipped due to conditional logic.
- PENDING = 'pending'¶
- SCHEDULED = 'scheduled'¶
- RUNNING = 'running'¶
- WAITING = 'waiting'¶
- SUCCEEDED = 'succeeded'¶
- FAILED = 'failed'¶
- CANCELED = 'canceled'¶
- SKIPPED = 'skipped'¶
- __new__(value)¶
- class litestar_workflows.core.types.StepT¶
Type variable bound to the Step protocol.
alias of TypeVar(‘StepT’, bound=
Step)
- class litestar_workflows.core.types.StepType[source]¶
Bases:
StrEnumClassification of step types within a workflow.
- MACHINE¶
Automated execution without human intervention.
- HUMAN¶
Requires user interaction and input.
- WEBHOOK¶
Waits for external callback or event.
- TIMER¶
Waits for a time-based condition to be met.
- GATEWAY¶
Decision or branching point in the workflow.
- MACHINE = 'machine'¶
- HUMAN = 'human'¶
- WEBHOOK = 'webhook'¶
- TIMER = 'timer'¶
- GATEWAY = 'gateway'¶
- __new__(value)¶
- class litestar_workflows.core.types.T¶
Generic type variable for return values.
alias of TypeVar(‘T’)
- class litestar_workflows.core.types.WorkflowStatus[source]¶
Bases:
StrEnumOverall status of a workflow instance.
- PENDING¶
Workflow has been created but not yet started.
- RUNNING¶
Workflow is actively executing steps.
- PAUSED¶
Workflow execution is temporarily paused.
- WAITING¶
Workflow is waiting for human input or external event.
- COMPLETED¶
Workflow finished successfully.
- FAILED¶
Workflow terminated due to an error.
- CANCELED¶
Workflow was manually canceled.
- PENDING = 'pending'¶
- RUNNING = 'running'¶
- PAUSED = 'paused'¶
- WAITING = 'waiting'¶
- COMPLETED = 'completed'¶
- FAILED = 'failed'¶
- CANCELED = 'canceled'¶
- __new__(value)¶
- class litestar_workflows.core.types.WorkflowT¶
Type variable bound to the Workflow protocol.
alias of TypeVar(‘WorkflowT’, bound=
Workflow)
Workflow execution context.
This module provides the WorkflowContext dataclass which carries state and metadata throughout workflow execution.
- class litestar_workflows.core.context.StepExecution[source]¶
Bases:
objectRecord of a single step execution within a workflow.
- step_name¶
Name of the executed step.
- status¶
Final status of the step execution.
- started_at¶
Timestamp when step execution began.
- completed_at¶
Timestamp when step execution finished (if completed).
- result¶
Return value from successful execution.
- error¶
Error message if execution failed.
- input_data¶
Input data passed to the step.
- output_data¶
Output data produced by the step.
- Parameters:
- __init__(step_name, status, started_at, completed_at=None, result=None, error=None, input_data=None, output_data=None)¶
- class litestar_workflows.core.context.WorkflowContext[source]¶
Bases:
objectExecution context passed between workflow steps.
The WorkflowContext carries all state, metadata, and execution history throughout a workflow’s lifecycle. It provides a mutable data dictionary for step communication and immutable metadata for audit and tracking purposes.
- workflow_id¶
Unique identifier for the workflow definition.
- instance_id¶
Unique identifier for this workflow instance.
- data¶
Mutable state dictionary for inter-step communication.
- metadata¶
Immutable metadata about the workflow execution.
- current_step¶
Name of the currently executing step.
- step_history¶
Chronological record of all step executions.
- started_at¶
Timestamp when the workflow instance was created.
- user_id¶
Optional user identifier for human task contexts.
- tenant_id¶
Optional tenant identifier for multi-tenancy support.
Example
>>> from uuid import uuid4 >>> from datetime import datetime >>> context = WorkflowContext( ... workflow_id=uuid4(), ... instance_id=uuid4(), ... data={"input": "value"}, ... metadata={"creator": "user123"}, ... current_step="initial_step", ... step_history=[], ... started_at=datetime.utcnow(), ... ) >>> context.set("result", 42) >>> context.get("result") 42
- Parameters:
-
step_history:
list[StepExecution]¶
- get(key, default=None)[source]¶
Retrieve a value from the workflow data dictionary.
- Parameters:
- Return type:
- Returns:
The value associated with the key, or the default if not present.
Example
>>> context.get("some_key", "default_value") 'default_value'
- set(key, value)[source]¶
Set a value in the workflow data dictionary.
- Parameters:
- Return type:
Example
>>> context.set("status", "approved") >>> context.get("status") 'approved'
- with_step(step_name)[source]¶
Create a new context for the given step execution.
This method returns a shallow copy of the context with the current_step updated to the provided step name. The data and metadata dictionaries are shared with the original context.
- Parameters:
step_name (
str) – Name of the step to execute next.- Return type:
- Returns:
A new WorkflowContext instance with updated current_step.
Example
>>> new_context = context.with_step("next_step") >>> new_context.current_step 'next_step'
- get_last_execution(step_name=None)[source]¶
Get the most recent execution record for a step.
- Parameters:
step_name (
str|None) – Optional step name to filter by. If None, returns the last execution regardless of step.- Return type:
- Returns:
The most recent StepExecution matching the criteria, or None if not found.
Example
>>> last_exec = context.get_last_execution("approval_step") >>> if last_exec and last_exec.status == "SUCCEEDED": ... print("Step succeeded")
- has_step_executed(step_name)[source]¶
Check if a step has been executed in this workflow instance.
- Parameters:
step_name (
str) – Name of the step to check.- Return type:
- Returns:
True if the step appears in the execution history, False otherwise.
Example
>>> if context.has_step_executed("validation"): ... print("Validation already completed")
- __init__(workflow_id, instance_id, data, metadata, current_step, step_history, started_at, user_id=None, tenant_id=None)¶
Workflow definition and edge structures.
This module provides the data structures for defining workflow graphs, including edges (transitions) and the complete workflow definition.
- class litestar_workflows.core.definition.Edge[source]¶
Bases:
objectDefines a transition between workflow steps.
An edge represents a directed connection from one step to another, optionally conditioned on a predicate function or expression.
- source¶
Name of the source step or the Step class itself.
- target¶
Name of the target step or the Step class itself.
- condition¶
Optional condition for edge traversal. Can be a callable that takes WorkflowContext and returns bool, or a string expression.
Example
>>> edge = Edge( ... source="submit", ... target="review", ... condition=lambda ctx: ctx.get("auto_approve") is False, ... ) >>> conditional_edge = Edge( ... source="review", target="approve", condition="context.get('approved') == True" ... )
- Parameters:
- evaluate_condition(context)[source]¶
Evaluate the edge condition against the workflow context.
- Parameters:
context (
WorkflowContext) – The current workflow execution context.- Return type:
- Returns:
True if the condition is met or if no condition exists, False otherwise.
Example
>>> edge = Edge(source="a", target="b", condition=lambda ctx: ctx.get("value") > 10) >>> context.set("value", 15) >>> edge.evaluate_condition(context) True
- get_source_name()[source]¶
Get the name of the source step.
- Return type:
- Returns:
The source step name as a string.
- class litestar_workflows.core.definition.WorkflowDefinition[source]¶
Bases:
objectDeclarative workflow structure.
The WorkflowDefinition captures the complete structure of a workflow including all steps, edges, and metadata. It serves as the blueprint for workflow execution.
- name¶
Unique identifier for the workflow.
- version¶
Version string for workflow versioning.
- description¶
Human-readable description of the workflow’s purpose.
- steps¶
Dictionary mapping step names to Step instances.
- edges¶
List of Edge instances defining the workflow graph.
- initial_step¶
Name of the step to execute first.
- terminal_steps¶
Set of step names that mark workflow completion.
Example
>>> from litestar_workflows.core.types import StepType >>> definition = WorkflowDefinition( ... name="approval_flow", ... version="1.0.0", ... description="Document approval workflow", ... steps={ ... "submit": SubmitStep(), ... "review": ReviewStep(), ... "approve": ApproveStep(), ... }, ... edges=[ ... Edge(source="submit", target="review"), ... Edge(source="review", target="approve"), ... ], ... initial_step="submit", ... terminal_steps={"approve"}, ... )
- Parameters:
- validate()[source]¶
Validate the workflow definition for common issues.
Example
>>> errors = definition.validate() >>> if errors: ... print("Validation errors:", errors)
- get_next_steps(current_step, context)[source]¶
Get the list of next steps from the current step based on edge conditions.
- Parameters:
current_step (
str) – Name of the current step.context (
WorkflowContext) – The workflow execution context for condition evaluation.
- Return type:
- Returns:
List of step names that should be executed next.
Example
>>> next_steps = definition.get_next_steps("review", context) >>> if "approve" in next_steps: ... print("Moving to approval")
- to_mermaid()[source]¶
Generate a MermaidJS graph representation of the workflow.
- Return type:
- Returns:
MermaidJS graph definition as a string.
Example
>>> mermaid = definition.to_mermaid() >>> print(mermaid) graph TD submit[Submit] review{Review} approve[Approve] submit --> review review --> approve
- to_mermaid_with_state(current_step=None, completed_steps=None, failed_steps=None)[source]¶
Generate a MermaidJS graph with execution state highlighting.
- Parameters:
- Return type:
- Returns:
MermaidJS graph definition with state styling.
Example
>>> mermaid = definition.to_mermaid_with_state( ... current_step="review", completed_steps=["submit"], failed_steps=[] ... )
Domain events for workflow lifecycle.
This module defines the event types that are emitted during workflow execution. These events can be used for logging, monitoring, triggering side effects, or integrating with external systems.
- class litestar_workflows.core.events.HumanTaskCompleted[source]¶
Bases:
WorkflowEventEvent emitted when a human task is completed.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the task was completed.
- step_name¶
Name of the human step.
- task_id¶
Unique identifier for this task.
- completed_by¶
User who completed the task.
- form_data¶
Data submitted by the user.
- comment¶
Optional comment provided by the user.
Example
>>> event = HumanTaskCompleted( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="manager_approval", ... task_id=uuid4(), ... completed_by="manager_123", ... form_data={"approved": True, "amount": 1500.00}, ... comment="Approved for payment", ... )
- Parameters:
- __init__(instance_id, timestamp, step_name, task_id, completed_by, form_data=None, comment=None)¶
- class litestar_workflows.core.events.HumanTaskCreated[source]¶
Bases:
WorkflowEventEvent emitted when a human task is created.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the task was created.
- step_name¶
Name of the human step.
- task_id¶
Unique identifier for this task.
- assignee¶
User or group assigned to the task.
- title¶
Display title for the task.
- description¶
Detailed description of what needs to be done.
- due_at¶
Optional deadline for task completion.
Example
>>> event = HumanTaskCreated( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="manager_approval", ... task_id=uuid4(), ... assignee="manager_group", ... title="Approve expense report", ... description="Review and approve expense report for project X", ... )
- Parameters:
- __init__(instance_id, timestamp, step_name, task_id, assignee=None, title=None, description=None, due_at=None)¶
- class litestar_workflows.core.events.HumanTaskReassigned[source]¶
Bases:
WorkflowEventEvent emitted when a human task is reassigned.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the task was reassigned.
- step_name¶
Name of the human step.
- task_id¶
Unique identifier for this task.
- from_assignee¶
Previous assignee.
- to_assignee¶
New assignee.
- reassigned_by¶
User who performed the reassignment.
- reason¶
Explanation for the reassignment.
Example
>>> event = HumanTaskReassigned( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="review", ... task_id=uuid4(), ... from_assignee="user_123", ... to_assignee="user_456", ... reassigned_by="admin_789", ... reason="Original assignee on vacation", ... )
- Parameters:
- __init__(instance_id, timestamp, step_name, task_id, from_assignee=None, to_assignee=None, reassigned_by=None, reason=None)¶
- class litestar_workflows.core.events.StepCompleted[source]¶
Bases:
WorkflowEventEvent emitted when a step completes successfully.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the step completed.
- step_name¶
Name of the step that completed.
- status¶
Final status of the step execution.
- result¶
Return value from the step execution.
- output_data¶
Output data produced by the step.
- duration_seconds¶
Execution time in seconds.
Example
>>> event = StepCompleted( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="validation", ... status="SUCCEEDED", ... result={"valid": True}, ... duration_seconds=1.5, ... )
- Parameters:
- __init__(instance_id, timestamp, step_name, status, result=None, output_data=None, duration_seconds=None)¶
- class litestar_workflows.core.events.StepFailed[source]¶
Bases:
WorkflowEventEvent emitted when a step fails execution.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the step failed.
- step_name¶
Name of the step that failed.
- error¶
Error message describing the failure.
- error_type¶
Type/class of the error.
- retry_count¶
Number of retry attempts made.
Example
>>> event = StepFailed( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="api_call", ... error="Connection timeout", ... error_type="TimeoutError", ... retry_count=3, ... )
- Parameters:
- __init__(instance_id, timestamp, step_name, error, error_type=None, retry_count=0)¶
- class litestar_workflows.core.events.StepSkipped[source]¶
Bases:
WorkflowEventEvent emitted when a step is skipped due to conditions.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the step was skipped.
- step_name¶
Name of the step that was skipped.
- reason¶
Explanation for why the step was skipped.
Example
>>> event = StepSkipped( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="optional_notification", ... reason="Notification disabled in settings", ... )
- __init__(instance_id, timestamp, step_name, reason=None)¶
- class litestar_workflows.core.events.StepStarted[source]¶
Bases:
WorkflowEventEvent emitted when a step begins execution.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the step started.
- step_name¶
Name of the step that started.
- step_type¶
Type of the step (MACHINE, HUMAN, etc.).
- input_data¶
Input data provided to the step.
Example
>>> event = StepStarted( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="validation", ... step_type="MACHINE", ... input_data={"data": "to_validate"}, ... )
- Parameters:
- __init__(instance_id, timestamp, step_name, step_type, input_data=None)¶
- class litestar_workflows.core.events.WorkflowCanceled[source]¶
Bases:
WorkflowEventEvent emitted when a workflow instance is manually canceled.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the workflow was canceled.
- reason¶
Explanation for the cancellation.
- canceled_by¶
User who canceled the workflow.
- current_step¶
Step that was executing when canceled.
Example
>>> event = WorkflowCanceled( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... reason="Request withdrawn", ... canceled_by="user_123", ... current_step="review", ... )
- Parameters:
- __init__(instance_id, timestamp, reason, canceled_by=None, current_step=None)¶
- class litestar_workflows.core.events.WorkflowCompleted[source]¶
Bases:
WorkflowEventEvent emitted when a workflow instance completes successfully.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the workflow completed.
- status¶
Final status of the workflow.
- final_step¶
Name of the final step that was executed.
- duration_seconds¶
Total execution time in seconds.
Example
>>> event = WorkflowCompleted( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... status="COMPLETED", ... final_step="approve", ... duration_seconds=3600.5, ... )
- Parameters:
- __init__(instance_id, timestamp, status, final_step=None, duration_seconds=None)¶
- class litestar_workflows.core.events.WorkflowEvent[source]¶
Bases:
objectBase class for all workflow events.
All workflow events include the instance_id and timestamp for tracking and correlation.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the event occurred.
- class litestar_workflows.core.events.WorkflowFailed[source]¶
Bases:
WorkflowEventEvent emitted when a workflow instance fails.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the workflow failed.
- error¶
Error message describing the failure.
- failed_step¶
Name of the step that caused the failure.
- error_type¶
Type/class of the error that occurred.
- stack_trace¶
Optional stack trace for debugging.
Example
>>> event = WorkflowFailed( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... error="Database connection failed", ... failed_step="data_validation", ... error_type="ConnectionError", ... )
- Parameters:
- __init__(instance_id, timestamp, error, failed_step=None, error_type=None, stack_trace=None)¶
- class litestar_workflows.core.events.WorkflowPaused[source]¶
Bases:
WorkflowEventEvent emitted when a workflow instance is paused.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the workflow was paused.
- reason¶
Explanation for pausing.
- paused_at_step¶
Step where execution was paused.
Example
>>> event = WorkflowPaused( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... reason="Awaiting external data", ... paused_at_step="data_ingestion", ... )
- __init__(instance_id, timestamp, reason=None, paused_at_step=None)¶
- class litestar_workflows.core.events.WorkflowResumed[source]¶
Bases:
WorkflowEventEvent emitted when a paused workflow instance resumes.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the workflow resumed.
- resumed_by¶
User who resumed the workflow.
- resuming_at_step¶
Step where execution will resume.
Example
>>> event = WorkflowResumed( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... resumed_by="user_456", ... resuming_at_step="data_processing", ... )
- Parameters:
- __init__(instance_id, timestamp, resumed_by=None, resuming_at_step=None)¶
- class litestar_workflows.core.events.WorkflowStarted[source]¶
Bases:
WorkflowEventEvent emitted when a workflow instance starts execution.
- instance_id¶
Unique identifier of the workflow instance.
- timestamp¶
When the workflow started.
- workflow_name¶
Name of the workflow definition.
- workflow_version¶
Version of the workflow definition.
- initial_data¶
Initial data provided when starting the workflow.
- user_id¶
Optional user who initiated the workflow.
- tenant_id¶
Optional tenant identifier for multi-tenancy.
Example
>>> event = WorkflowStarted( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... workflow_name="approval_flow", ... workflow_version="1.0.0", ... initial_data={"document_id": "doc_123"}, ... )
- Parameters:
- __init__(instance_id, timestamp, workflow_name, workflow_version, initial_data=None, user_id=None, tenant_id=None)¶
Public API¶
The following is available from the top-level package:
from litestar_workflows import (
# Protocols
Step,
Workflow,
ExecutionEngine,
# Types
StepType,
StepStatus,
WorkflowStatus,
WorkflowContext,
WorkflowDefinition,
Edge,
# Base implementations
BaseMachineStep,
BaseHumanStep,
BaseGateway,
# Groups
SequentialGroup,
ParallelGroup,
ConditionalGroup,
# Engine
LocalExecutionEngine,
WorkflowRegistry,
# Exceptions
WorkflowsError,
WorkflowNotFoundError,
StepExecutionError,
InvalidTransitionError,
)
Engine Module¶
Execution engines and workflow registry.
Workflow execution engine implementations.
This module provides execution engines for orchestrating workflow instances, managing state transitions, and coordinating step execution.
- class litestar_workflows.engine.ExecutionEngine[source]¶
Bases:
ProtocolProtocol for workflow execution engines.
The ExecutionEngine is responsible for orchestrating workflow execution, including starting workflows, executing steps, scheduling, and handling human tasks.
Example
>>> engine = LocalExecutionEngine() >>> instance = await engine.start_workflow(MyWorkflow, initial_data={"input": "value"})
- __init__(*args, **kwargs)¶
- async cancel_workflow(instance_id, reason)[source]¶
Cancel a running workflow instance.
- Parameters:
- Return type:
Example
>>> await engine.cancel_workflow( ... instance_id=instance.id, reason="Request withdrawn by submitter" ... )
- async complete_human_task(instance_id, step_name, user_id, data)[source]¶
Complete a human task with user-provided data.
- Parameters:
- Return type:
Example
>>> await engine.complete_human_task( ... instance_id=instance.id, ... step_name="approval", ... user_id="user_123", ... data={"approved": True, "comments": "Looks good"}, ... )
- async execute_step(step, context, previous_result=None)[source]¶
Execute a single step within a workflow.
- Parameters:
context (
WorkflowContext) – The current workflow context.previous_result (
Any) – Optional result from a previous step.
- Return type:
- Returns:
The result of the step execution.
- Raises:
Exception – If step execution fails and error handling doesn’t compensate.
- async schedule_step(instance_id, step_name, delay=None)[source]¶
Schedule a step for later execution.
- Parameters:
- Return type:
Example
>>> from datetime import timedelta >>> await engine.schedule_step( ... instance_id=instance.id, step_name="reminder", delay=timedelta(hours=24) ... )
- class litestar_workflows.engine.LocalExecutionEngine[source]¶
Bases:
objectIn-memory async execution engine for workflows.
This engine executes workflows in the same process using asyncio tasks. It’s suitable for development, testing, and single-instance production deployments where distributed execution is not required.
- registry¶
The workflow registry for looking up definitions.
- persistence¶
Optional persistence layer for saving state.
- event_bus¶
Optional event bus for emitting workflow events.
- _instances¶
In-memory storage of workflow instances.
- _running¶
Map of instance IDs to their running asyncio tasks.
- Parameters:
registry (
WorkflowRegistry)
- __init__(registry, persistence=None, event_bus=None)[source]¶
Initialize the local execution engine.
- Parameters:
registry (
WorkflowRegistry) – The workflow registry.persistence (
Any|None) – Optional persistence layer implementing save/load methods.event_bus (
Any|None) – Optional event bus implementing emit method.
- async complete_human_task(instance_id, step_name, user_id, data)[source]¶
Complete a human task with user-provided data.
- async execute_step(step, context, previous_result=None)[source]¶
Execute a single step with the given context.
- Parameters:
context (
WorkflowContext) – The workflow context.previous_result (
Any) – Optional result from previous step.
- Return type:
- Returns:
The result of the step execution.
- get_all_instances()[source]¶
Get all workflow instances (running and completed).
- Return type:
list[WorkflowInstanceData]- Returns:
List of all WorkflowInstanceData objects.
- get_running_instances()[source]¶
Get all currently running workflow instances.
- Return type:
list[WorkflowInstanceData]- Returns:
List of running WorkflowInstanceData objects.
- async start_workflow(workflow, initial_data=None)[source]¶
Start a new workflow instance.
Creates a new workflow instance and begins execution from the initial step.
- Parameters:
- Return type:
WorkflowInstanceData- Returns:
The created WorkflowInstanceData.
Example
>>> engine = LocalExecutionEngine(registry) >>> instance = await engine.start_workflow( ... ApprovalWorkflow, initial_data={"document_id": "doc_123"} ... )
- class litestar_workflows.engine.WorkflowGraph[source]¶
Bases:
objectGraph representation of a workflow for navigation and validation.
The graph provides methods to navigate between steps, validate structure, and determine execution paths based on workflow state.
- definition¶
The workflow definition this graph represents.
- _adjacency¶
Adjacency list mapping step names to outgoing edges.
- _reverse_adjacency¶
Reverse adjacency list for finding predecessors.
- Parameters:
definition (
WorkflowDefinition)
- __init__(definition)[source]¶
Initialize a workflow graph from a definition.
- Parameters:
definition (
WorkflowDefinition) – The workflow definition to represent as a graph.
- classmethod from_definition(definition)[source]¶
Create a workflow graph from a definition.
- Parameters:
definition (
WorkflowDefinition) – The workflow definition.- Return type:
- Returns:
A WorkflowGraph instance.
Example
>>> graph = WorkflowGraph.from_definition(my_definition)
- get_all_paths(start, end, max_paths=100)[source]¶
Find all paths from start step to end step.
- Parameters:
- Return type:
- Returns:
List of paths, where each path is a list of step names.
Example
>>> paths = graph.get_all_paths("start", "end") >>> for path in paths: ... print(" -> ".join(path))
- get_next_steps(current, context)[source]¶
Get the next steps from the current step.
Evaluates edge conditions to determine which steps should execute next. If multiple unconditional edges exist, all targets are returned (parallel).
- Parameters:
current (
str) – Name of the current step.context (
WorkflowContext) – The workflow context for condition evaluation.
- Return type:
- Returns:
List of next step names. Empty list if current step is terminal.
Example
>>> next_steps = graph.get_next_steps("approval", context) >>> if len(next_steps) > 1: ... print("Parallel execution required")
- get_previous_steps(current)[source]¶
Get the steps that lead to the current step.
- Parameters:
current (
str) – Name of the current step.- Return type:
- Returns:
List of predecessor step names.
Example
>>> predecessors = graph.get_previous_steps("final_step")
- get_step_depth(step_name)[source]¶
Get the minimum depth from initial step to the given step.
- Parameters:
step_name (
str) – Name of the step.- Return type:
- Returns:
Minimum number of steps from initial to this step. Returns -1 if unreachable.
Example
>>> depth = graph.get_step_depth("final_approval") >>> print(f"Step is at depth {depth}")
- is_terminal(step_name)[source]¶
Check if a step is a terminal (end) step.
- Parameters:
step_name (
str) – Name of the step to check.- Return type:
- Returns:
True if the step has no outgoing edges or is in terminal_steps.
Example
>>> if graph.is_terminal("approval"): ... print("Workflow ends here")
- validate()[source]¶
Validate the workflow graph structure.
Checks for common graph issues: - Unreachable steps (not connected to initial step) - Disconnected components - Steps with no outgoing edges that aren’t marked terminal - Invalid edge references
Example
>>> errors = graph.validate() >>> if errors: ... for error in errors: ... print(f"Validation error: {error}")
- litestar_workflows.engine.WorkflowInstance¶
alias of
WorkflowInstanceData
- class litestar_workflows.engine.WorkflowRegistry[source]¶
Bases:
objectRegistry for storing and retrieving workflow definitions.
The registry maintains a mapping of workflow names to versions and their definitions, enabling workflow lookup and version management.
- _definitions¶
Nested dict mapping name -> version -> WorkflowDefinition.
- _workflow_classes¶
Map of workflow names to their class definitions.
- get_definition(name, version=None)[source]¶
Retrieve a workflow definition by name and optional version.
- Parameters:
- Return type:
- Returns:
The WorkflowDefinition for the requested workflow.
- Raises:
KeyError – If the workflow name or version is not found.
Example
>>> definition = registry.get_definition("approval_workflow") >>> definition_v1 = registry.get_definition("approval_workflow", "1.0.0")
- get_versions(name)[source]¶
Get all versions for a workflow.
- Parameters:
name (
str) – The workflow name.- Return type:
- Returns:
List of version strings.
- Raises:
KeyError – If the workflow name is not found.
Example
>>> versions = registry.get_versions("approval_workflow") >>> print(versions) # ['1.0.0', '1.1.0', '2.0.0']
- get_workflow_class(name)[source]¶
Retrieve the workflow class by name.
- Parameters:
name (
str) – The workflow name.- Return type:
- Returns:
The Workflow class.
- Raises:
KeyError – If the workflow name is not found.
Example
>>> WorkflowClass = registry.get_workflow_class("approval_workflow") >>> instance = await engine.start_workflow(WorkflowClass)
- has_workflow(name, version=None)[source]¶
Check if a workflow exists in the registry.
- Parameters:
- Return type:
- Returns:
True if the workflow exists, False otherwise.
Example
>>> if registry.has_workflow("approval_workflow"): ... definition = registry.get_definition("approval_workflow")
- list_definitions(active_only=True)[source]¶
List all registered workflow definitions.
- Parameters:
active_only (
bool) – If True, only return the latest version of each workflow. If False, return all versions.- Return type:
- Returns:
List of WorkflowDefinition objects.
Example
>>> all_workflows = registry.list_definitions() >>> all_versions = registry.list_definitions(active_only=False)
- register(workflow_class)[source]¶
Register a workflow class with the registry.
Extracts the workflow definition from the class and stores it indexed by name and version.
Example
>>> registry = WorkflowRegistry() >>> registry.register(MyWorkflow)
Local in-memory async execution engine.
This module provides a local, in-process execution engine suitable for development, testing, and single-instance deployments.
- class litestar_workflows.engine.local.LocalExecutionEngine[source]¶
Bases:
objectIn-memory async execution engine for workflows.
This engine executes workflows in the same process using asyncio tasks. It’s suitable for development, testing, and single-instance production deployments where distributed execution is not required.
- registry¶
The workflow registry for looking up definitions.
- persistence¶
Optional persistence layer for saving state.
- event_bus¶
Optional event bus for emitting workflow events.
- _instances¶
In-memory storage of workflow instances.
- _running¶
Map of instance IDs to their running asyncio tasks.
- Parameters:
registry (
WorkflowRegistry)
- __init__(registry, persistence=None, event_bus=None)[source]¶
Initialize the local execution engine.
- Parameters:
registry (
WorkflowRegistry) – The workflow registry.persistence (
Any|None) – Optional persistence layer implementing save/load methods.event_bus (
Any|None) – Optional event bus implementing emit method.
- async start_workflow(workflow, initial_data=None)[source]¶
Start a new workflow instance.
Creates a new workflow instance and begins execution from the initial step.
- Parameters:
- Return type:
WorkflowInstanceData- Returns:
The created WorkflowInstanceData.
Example
>>> engine = LocalExecutionEngine(registry) >>> instance = await engine.start_workflow( ... ApprovalWorkflow, initial_data={"document_id": "doc_123"} ... )
- async execute_step(step, context, previous_result=None)[source]¶
Execute a single step with the given context.
- Parameters:
context (
WorkflowContext) – The workflow context.previous_result (
Any) – Optional result from previous step.
- Return type:
- Returns:
The result of the step execution.
- async complete_human_task(instance_id, step_name, user_id, data)[source]¶
Complete a human task with user-provided data.
Workflow registry for managing workflow definitions.
This module provides a registry for storing, retrieving, and managing workflow definitions with support for versioning.
- class litestar_workflows.engine.registry.WorkflowRegistry[source]¶
Bases:
objectRegistry for storing and retrieving workflow definitions.
The registry maintains a mapping of workflow names to versions and their definitions, enabling workflow lookup and version management.
- _definitions¶
Nested dict mapping name -> version -> WorkflowDefinition.
- _workflow_classes¶
Map of workflow names to their class definitions.
- register(workflow_class)[source]¶
Register a workflow class with the registry.
Extracts the workflow definition from the class and stores it indexed by name and version.
Example
>>> registry = WorkflowRegistry() >>> registry.register(MyWorkflow)
- get_definition(name, version=None)[source]¶
Retrieve a workflow definition by name and optional version.
- Parameters:
- Return type:
- Returns:
The WorkflowDefinition for the requested workflow.
- Raises:
KeyError – If the workflow name or version is not found.
Example
>>> definition = registry.get_definition("approval_workflow") >>> definition_v1 = registry.get_definition("approval_workflow", "1.0.0")
- get_workflow_class(name)[source]¶
Retrieve the workflow class by name.
- Parameters:
name (
str) – The workflow name.- Return type:
- Returns:
The Workflow class.
- Raises:
KeyError – If the workflow name is not found.
Example
>>> WorkflowClass = registry.get_workflow_class("approval_workflow") >>> instance = await engine.start_workflow(WorkflowClass)
- list_definitions(active_only=True)[source]¶
List all registered workflow definitions.
- Parameters:
active_only (
bool) – If True, only return the latest version of each workflow. If False, return all versions.- Return type:
- Returns:
List of WorkflowDefinition objects.
Example
>>> all_workflows = registry.list_definitions() >>> all_versions = registry.list_definitions(active_only=False)
- unregister(name, version=None)[source]¶
Remove a workflow from the registry.
- Parameters:
- Return type:
Example
>>> registry.unregister("old_workflow") >>> registry.unregister("approval_workflow", "1.0.0")
- has_workflow(name, version=None)[source]¶
Check if a workflow exists in the registry.
- Parameters:
- Return type:
- Returns:
True if the workflow exists, False otherwise.
Example
>>> if registry.has_workflow("approval_workflow"): ... definition = registry.get_definition("approval_workflow")
- get_versions(name)[source]¶
Get all versions for a workflow.
- Parameters:
name (
str) – The workflow name.- Return type:
- Returns:
List of version strings.
- Raises:
KeyError – If the workflow name is not found.
Example
>>> versions = registry.get_versions("approval_workflow") >>> print(versions) # ['1.0.0', '1.1.0', '2.0.0']
Workflow graph operations and navigation.
This module provides graph-based operations for workflow definitions, including step navigation, validation, and path finding.
- class litestar_workflows.engine.graph.WorkflowGraph[source]¶
Bases:
objectGraph representation of a workflow for navigation and validation.
The graph provides methods to navigate between steps, validate structure, and determine execution paths based on workflow state.
- definition¶
The workflow definition this graph represents.
- _adjacency¶
Adjacency list mapping step names to outgoing edges.
- _reverse_adjacency¶
Reverse adjacency list for finding predecessors.
- Parameters:
definition (
WorkflowDefinition)
- __init__(definition)[source]¶
Initialize a workflow graph from a definition.
- Parameters:
definition (
WorkflowDefinition) – The workflow definition to represent as a graph.
- classmethod from_definition(definition)[source]¶
Create a workflow graph from a definition.
- Parameters:
definition (
WorkflowDefinition) – The workflow definition.- Return type:
- Returns:
A WorkflowGraph instance.
Example
>>> graph = WorkflowGraph.from_definition(my_definition)
- get_next_steps(current, context)[source]¶
Get the next steps from the current step.
Evaluates edge conditions to determine which steps should execute next. If multiple unconditional edges exist, all targets are returned (parallel).
- Parameters:
current (
str) – Name of the current step.context (
WorkflowContext) – The workflow context for condition evaluation.
- Return type:
- Returns:
List of next step names. Empty list if current step is terminal.
Example
>>> next_steps = graph.get_next_steps("approval", context) >>> if len(next_steps) > 1: ... print("Parallel execution required")
- get_previous_steps(current)[source]¶
Get the steps that lead to the current step.
- Parameters:
current (
str) – Name of the current step.- Return type:
- Returns:
List of predecessor step names.
Example
>>> predecessors = graph.get_previous_steps("final_step")
- is_terminal(step_name)[source]¶
Check if a step is a terminal (end) step.
- Parameters:
step_name (
str) – Name of the step to check.- Return type:
- Returns:
True if the step has no outgoing edges or is in terminal_steps.
Example
>>> if graph.is_terminal("approval"): ... print("Workflow ends here")
- validate()[source]¶
Validate the workflow graph structure.
Checks for common graph issues: - Unreachable steps (not connected to initial step) - Disconnected components - Steps with no outgoing edges that aren’t marked terminal - Invalid edge references
Example
>>> errors = graph.validate() >>> if errors: ... for error in errors: ... print(f"Validation error: {error}")
- get_all_paths(start, end, max_paths=100)[source]¶
Find all paths from start step to end step.
- Parameters:
- Return type:
- Returns:
List of paths, where each path is a list of step names.
Example
>>> paths = graph.get_all_paths("start", "end") >>> for path in paths: ... print(" -> ".join(path))
- get_step_depth(step_name)[source]¶
Get the minimum depth from initial step to the given step.
- Parameters:
step_name (
str) – Name of the step.- Return type:
- Returns:
Minimum number of steps from initial to this step. Returns -1 if unreachable.
Example
>>> depth = graph.get_step_depth("final_approval") >>> print(f"Step is at depth {depth}")
Steps Module¶
Built-in step implementations.
Built-in step implementations for litestar-workflows.
- class litestar_workflows.steps.BaseHumanStep[source]¶
Bases:
BaseStepBase for human approval/interaction steps.
Human steps pause workflow execution and wait for user input. They support forms, assignments, and deadline tracking.
- Parameters:
- __init__(name, title, description='', form_schema=None, assignee_key=None)[source]¶
Initialize the human step.
- Parameters:
- async execute(context)[source]¶
Execute the human step.
For human steps, execution typically means waiting for user input. Override this if you need custom behavior.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
The form data submitted by the user.
- async get_assignee(context)[source]¶
Get the assignee for this task from context.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
User ID to assign the task to, or None for unassigned.
- class litestar_workflows.steps.BaseMachineStep[source]¶
Bases:
BaseStepBase for automated machine steps.
Machine steps execute automatically without requiring human interaction. They are the building blocks for automated workflow processes.
- class litestar_workflows.steps.BaseStep[source]¶
Bases:
objectBase implementation with common functionality for all steps.
This class provides default implementations of the Step protocol methods and common attributes. Subclass this to create custom step types.
- async can_execute(context)[source]¶
Check if step can execute given the current context.
Override this method to implement guard logic.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
True if the step can execute, False to skip.
- async execute(context)[source]¶
Execute the step with the given context.
Override this method to implement step logic.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
The result of the step execution.
- Raises:
NotImplementedError – Must be implemented by subclasses.
- async on_failure(context, error)[source]¶
Hook called after failed step execution.
Override this method to implement error handling logic.
- Parameters:
context (
WorkflowContext) – The workflow execution context.error (
Exception) – The exception that caused the failure.
- Return type:
- async on_success(context, result)[source]¶
Hook called after successful step execution.
Override this method to implement post-success logic.
- Parameters:
context (
WorkflowContext) – The workflow execution context.result (
Any) – The result returned by execute().
- Return type:
- class litestar_workflows.steps.ConditionalGroup[source]¶
Bases:
StepGroupExecute one of multiple branches based on condition.
This implements the Gateway pattern where a condition function determines which branch to execute. Similar to if/else or switch statements.
Example
>>> def check_status(ctx: WorkflowContext) -> str: ... return "approved" if ctx.get("approved") else "rejected" >>> group = ConditionalGroup( ... condition=check_status, ... branches={ ... "approved": approve_step, ... "rejected": reject_step, ... }, ... ) >>> result = await group.execute(context, engine)
- Parameters:
- async execute(context, engine)[source]¶
Execute the branch selected by the condition.
- Parameters:
context (
WorkflowContext) – The workflow execution context.engine (
ExecutionEngine) – The execution engine to delegate step execution.
- Return type:
- Returns:
The result of the selected branch, or None if no match.
- Raises:
Exception – Any exception from step execution.
- class litestar_workflows.steps.ExclusiveGateway[source]¶
Bases:
BaseStepXOR gateway - exactly one path based on condition.
This gateway evaluates a condition function and returns the name of the next step to execute. Only one path will be followed.
Example
>>> def check_approval(ctx: WorkflowContext) -> str: ... return "approved_step" if ctx.get("approved") else "rejected_step" >>> gateway = ExclusiveGateway("approval_gate", condition=check_approval) >>> next_step = await gateway.execute(context) # Returns step name
- Parameters:
name (
str)condition (
Callable[[WorkflowContext],str])description (
str)
- __init__(name, condition, description='')[source]¶
Initialize an exclusive gateway.
- Parameters:
name (
str) – Unique identifier for the gateway.condition (
Callable[[WorkflowContext],str]) – Function that evaluates context and returns next step name.description (
str) – Human-readable description.
- async execute(context)[source]¶
Evaluate condition and return the name of the next step.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
The name of the next step to execute.
- Raises:
Exception – If condition evaluation fails.
- class litestar_workflows.steps.ParallelGateway[source]¶
Bases:
BaseStepAND gateway - all paths execute in parallel.
This gateway splits execution into multiple parallel branches. All branches will be executed concurrently.
Example
>>> gateway = ParallelGateway( ... "fork_point", branches=["notify_team", "update_db", "send_email"] ... ) >>> branch_names = await gateway.execute(context)
- async execute(context)[source]¶
Return the list of branches to execute in parallel.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
List of step names to execute concurrently.
- class litestar_workflows.steps.ParallelGroup[source]¶
Bases:
StepGroupExecute steps in parallel.
This implements the Group pattern where multiple steps execute concurrently. Optionally supports a callback step (Chord pattern) that receives all results.
Example
>>> # Simple parallel execution >>> group = ParallelGroup(step1, step2, step3) >>> results = await group.execute(context, engine) # [result1, result2, result3]
>>> # Chord pattern with callback >>> group = ParallelGroup(step1, step2, step3, callback=aggregate_step) >>> result = await group.execute(context, engine) # aggregate_step([r1, r2, r3])
- async execute(context, engine)[source]¶
Execute steps in parallel using asyncio.gather.
- Parameters:
context (
WorkflowContext) – The workflow execution context.engine (
ExecutionEngine) – The execution engine to delegate step execution.
- Return type:
- Returns:
List of results if no callback, otherwise callback result.
- Raises:
Exception – Any exception from step execution.
- class litestar_workflows.steps.SequentialGroup[source]¶
Bases:
StepGroupExecute steps in sequence, passing results.
This implements the Chain pattern where each step receives the result of the previous step as input. The final step’s result is returned.
Example
>>> group = SequentialGroup(step1, step2, step3) >>> result = await group.execute(context, engine) # step1 -> step2(result1) -> step3(result2) -> result3
- async execute(context, engine)[source]¶
Execute steps sequentially, passing results forward.
- Parameters:
context (
WorkflowContext) – The workflow execution context.engine (
ExecutionEngine) – The execution engine to delegate step execution.
- Return type:
- Returns:
The result of the final step.
- Raises:
Exception – Any exception from step execution.
- class litestar_workflows.steps.StepGroup[source]¶
Bases:
ABCBase for composable step patterns.
Step groups allow you to compose multiple steps into reusable patterns like sequences, parallel execution, and conditional branching.
- abstractmethod async execute(context, engine)[source]¶
Execute the step group.
- Parameters:
context (
WorkflowContext) – The workflow execution context.engine (
ExecutionEngine) – The execution engine to delegate step execution.
- Return type:
- Returns:
The result of the group execution.
- Raises:
Exception – Any exception during group execution.
- class litestar_workflows.steps.TimerStep[source]¶
Bases:
BaseStepStep that waits for a duration before continuing.
Timer steps introduce delays in workflow execution. The duration can be static or dynamically calculated based on the workflow context.
Example
>>> # Static delay >>> step = TimerStep("wait_5min", duration=timedelta(minutes=5)) >>> await step.execute(context)
>>> # Dynamic delay based on context >>> def get_delay(ctx: WorkflowContext) -> timedelta: ... priority = ctx.get("priority", "normal") ... return timedelta(hours=1) if priority == "low" else timedelta(minutes=5) >>> step = TimerStep("dynamic_wait", duration=get_delay) >>> await step.execute(context)
- Parameters:
- async execute(context)[source]¶
Wait for the specified duration.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
None after the delay completes.
- get_duration(context)[source]¶
Get the delay duration for this step.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
The duration to wait.
- class litestar_workflows.steps.WebhookStep[source]¶
Bases:
BaseStepStep that waits for external webhook callback.
Webhook steps pause workflow execution until an external system sends a callback with data. This is useful for integrating with third-party services or async external processes.
The execution engine is responsible for managing the actual waiting and resuming the workflow when the webhook is received.
Example
>>> # Wait for payment confirmation >>> step = WebhookStep( ... "wait_payment", ... callback_key="payment_data", ... description="Wait for payment gateway callback", ... ) >>> # Workflow pauses here until webhook received >>> payment_data = await step.execute(context) >>> # Continue with payment_data from webhook
- async execute(context)[source]¶
Retrieve webhook data from context.
This step pauses execution until a webhook is received. The execution engine handles the actual waiting mechanism.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
The data received from the webhook callback.
Note
The engine will populate the callback_key in context when the webhook is received before resuming execution.
Base step implementations for litestar-workflows.
- class litestar_workflows.steps.base.BaseStep[source]¶
Bases:
objectBase implementation with common functionality for all steps.
This class provides default implementations of the Step protocol methods and common attributes. Subclass this to create custom step types.
- async execute(context)[source]¶
Execute the step with the given context.
Override this method to implement step logic.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
The result of the step execution.
- Raises:
NotImplementedError – Must be implemented by subclasses.
- async can_execute(context)[source]¶
Check if step can execute given the current context.
Override this method to implement guard logic.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
True if the step can execute, False to skip.
- async on_success(context, result)[source]¶
Hook called after successful step execution.
Override this method to implement post-success logic.
- Parameters:
context (
WorkflowContext) – The workflow execution context.result (
Any) – The result returned by execute().
- Return type:
- async on_failure(context, error)[source]¶
Hook called after failed step execution.
Override this method to implement error handling logic.
- Parameters:
context (
WorkflowContext) – The workflow execution context.error (
Exception) – The exception that caused the failure.
- Return type:
- class litestar_workflows.steps.base.BaseMachineStep[source]¶
Bases:
BaseStepBase for automated machine steps.
Machine steps execute automatically without requiring human interaction. They are the building blocks for automated workflow processes.
- class litestar_workflows.steps.base.BaseHumanStep[source]¶
Bases:
BaseStepBase for human approval/interaction steps.
Human steps pause workflow execution and wait for user input. They support forms, assignments, and deadline tracking.
- Parameters:
- __init__(name, title, description='', form_schema=None, assignee_key=None)[source]¶
Initialize the human step.
- Parameters:
- async get_assignee(context)[source]¶
Get the assignee for this task from context.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
User ID to assign the task to, or None for unassigned.
- async execute(context)[source]¶
Execute the human step.
For human steps, execution typically means waiting for user input. Override this if you need custom behavior.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
The form data submitted by the user.
Composable step groups for litestar-workflows.
- class litestar_workflows.steps.groups.StepGroup[source]¶
Bases:
ABCBase for composable step patterns.
Step groups allow you to compose multiple steps into reusable patterns like sequences, parallel execution, and conditional branching.
- abstractmethod async execute(context, engine)[source]¶
Execute the step group.
- Parameters:
context (
WorkflowContext) – The workflow execution context.engine (
ExecutionEngine) – The execution engine to delegate step execution.
- Return type:
- Returns:
The result of the group execution.
- Raises:
Exception – Any exception during group execution.
- class litestar_workflows.steps.groups.SequentialGroup[source]¶
Bases:
StepGroupExecute steps in sequence, passing results.
This implements the Chain pattern where each step receives the result of the previous step as input. The final step’s result is returned.
Example
>>> group = SequentialGroup(step1, step2, step3) >>> result = await group.execute(context, engine) # step1 -> step2(result1) -> step3(result2) -> result3
- async execute(context, engine)[source]¶
Execute steps sequentially, passing results forward.
- Parameters:
context (
WorkflowContext) – The workflow execution context.engine (
ExecutionEngine) – The execution engine to delegate step execution.
- Return type:
- Returns:
The result of the final step.
- Raises:
Exception – Any exception from step execution.
- class litestar_workflows.steps.groups.ParallelGroup[source]¶
Bases:
StepGroupExecute steps in parallel.
This implements the Group pattern where multiple steps execute concurrently. Optionally supports a callback step (Chord pattern) that receives all results.
Example
>>> # Simple parallel execution >>> group = ParallelGroup(step1, step2, step3) >>> results = await group.execute(context, engine) # [result1, result2, result3]
>>> # Chord pattern with callback >>> group = ParallelGroup(step1, step2, step3, callback=aggregate_step) >>> result = await group.execute(context, engine) # aggregate_step([r1, r2, r3])
- async execute(context, engine)[source]¶
Execute steps in parallel using asyncio.gather.
- Parameters:
context (
WorkflowContext) – The workflow execution context.engine (
ExecutionEngine) – The execution engine to delegate step execution.
- Return type:
- Returns:
List of results if no callback, otherwise callback result.
- Raises:
Exception – Any exception from step execution.
- class litestar_workflows.steps.groups.ConditionalGroup[source]¶
Bases:
StepGroupExecute one of multiple branches based on condition.
This implements the Gateway pattern where a condition function determines which branch to execute. Similar to if/else or switch statements.
Example
>>> def check_status(ctx: WorkflowContext) -> str: ... return "approved" if ctx.get("approved") else "rejected" >>> group = ConditionalGroup( ... condition=check_status, ... branches={ ... "approved": approve_step, ... "rejected": reject_step, ... }, ... ) >>> result = await group.execute(context, engine)
- Parameters:
- async execute(context, engine)[source]¶
Execute the branch selected by the condition.
- Parameters:
context (
WorkflowContext) – The workflow execution context.engine (
ExecutionEngine) – The execution engine to delegate step execution.
- Return type:
- Returns:
The result of the selected branch, or None if no match.
- Raises:
Exception – Any exception from step execution.
Decision gateway steps for workflow branching.
- class litestar_workflows.steps.gateway.ExclusiveGateway[source]¶
Bases:
BaseStepXOR gateway - exactly one path based on condition.
This gateway evaluates a condition function and returns the name of the next step to execute. Only one path will be followed.
Example
>>> def check_approval(ctx: WorkflowContext) -> str: ... return "approved_step" if ctx.get("approved") else "rejected_step" >>> gateway = ExclusiveGateway("approval_gate", condition=check_approval) >>> next_step = await gateway.execute(context) # Returns step name
- Parameters:
name (
str)condition (
Callable[[WorkflowContext],str])description (
str)
- __init__(name, condition, description='')[source]¶
Initialize an exclusive gateway.
- Parameters:
name (
str) – Unique identifier for the gateway.condition (
Callable[[WorkflowContext],str]) – Function that evaluates context and returns next step name.description (
str) – Human-readable description.
- async execute(context)[source]¶
Evaluate condition and return the name of the next step.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
The name of the next step to execute.
- Raises:
Exception – If condition evaluation fails.
- class litestar_workflows.steps.gateway.ParallelGateway[source]¶
Bases:
BaseStepAND gateway - all paths execute in parallel.
This gateway splits execution into multiple parallel branches. All branches will be executed concurrently.
Example
>>> gateway = ParallelGateway( ... "fork_point", branches=["notify_team", "update_db", "send_email"] ... ) >>> branch_names = await gateway.execute(context)
- async execute(context)[source]¶
Return the list of branches to execute in parallel.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
List of step names to execute concurrently.
Timer and delay steps for workflow scheduling.
- class litestar_workflows.steps.timer.TimerStep[source]¶
Bases:
BaseStepStep that waits for a duration before continuing.
Timer steps introduce delays in workflow execution. The duration can be static or dynamically calculated based on the workflow context.
Example
>>> # Static delay >>> step = TimerStep("wait_5min", duration=timedelta(minutes=5)) >>> await step.execute(context)
>>> # Dynamic delay based on context >>> def get_delay(ctx: WorkflowContext) -> timedelta: ... priority = ctx.get("priority", "normal") ... return timedelta(hours=1) if priority == "low" else timedelta(minutes=5) >>> step = TimerStep("dynamic_wait", duration=get_delay) >>> await step.execute(context)
- Parameters:
- get_duration(context)[source]¶
Get the delay duration for this step.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
The duration to wait.
- async execute(context)[source]¶
Wait for the specified duration.
- Parameters:
context (
WorkflowContext) – The workflow execution context.- Return type:
- Returns:
None after the delay completes.
Database Module¶
Available with the [db] extra: pip install litestar-workflows[db]
Database persistence layer for litestar-workflows.
This module provides SQLAlchemy models and repositories for persisting workflow definitions, instances, and execution history.
- Requires the [db] extra:
pip install litestar-workflows[db]
- class litestar_workflows.db.HumanTaskModel[source]¶
Bases:
UUIDAuditBasePending human task for quick querying and assignment.
Provides a denormalized view of pending human approval tasks for efficient querying by assignee, due date, and status.
- instance_id¶
Foreign key to the workflow instance.
- step_execution_id¶
Foreign key to the step execution.
- step_name¶
Name of the human task step.
- title¶
Display title for the task.
- description¶
Detailed description of the task.
- form_schema¶
JSON Schema defining the task form.
- assignee_id¶
User ID assigned to complete the task.
- assignee_group¶
Group/role that can complete the task.
- due_at¶
Deadline for task completion.
- reminder_at¶
When to send a reminder.
- status¶
Current task status (PENDING, COMPLETED, CANCELED).
- completed_at¶
When the task was completed.
- completed_by¶
User who completed the task.
- __init__(**kwargs)¶
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
- assignee_group: Mapped[str | None]¶
- assignee_id: Mapped[str | None]¶
- completed_at: Mapped[datetime | None]¶
- completed_by: Mapped[str | None]¶
- created_at: Mapped[datetime.datetime]¶
Date/time of instance creation.
- description: Mapped[str | None]¶
- due_at: Mapped[datetime | None]¶
- form_schema: Mapped[dict[str, Any] | None]¶
- id: Mapped[UUID]¶
UUID Primary key column.
- instance: Mapped[WorkflowInstanceModel]¶
- instance_id: Mapped[UUID]¶
- reminder_at: Mapped[datetime | None]¶
- status: Mapped[str]¶
- step_execution_id: Mapped[UUID]¶
- step_name: Mapped[str]¶
- title: Mapped[str]¶
- updated_at: Mapped[datetime.datetime]¶
Date/time of instance last update.
- class litestar_workflows.db.HumanTaskRepository[source]¶
Bases:
SQLAlchemyAsyncRepository[HumanTaskModel]Repository for human task CRUD operations.
Provides methods for querying pending human tasks by assignee, group, and due date.
- Parameters:
session (
Union[AsyncSession,async_scoped_session[AsyncSession]])auto_expunge (
bool)auto_refresh (
bool)auto_commit (
bool)order_by (
Union[list[Union[tuple[Union[str,InstrumentedAttribute[Any]],bool],UnaryExpression[Any]]],tuple[Union[str,InstrumentedAttribute[Any]],bool],UnaryExpression[Any],None])load (
Union[Sequence[Union[_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any],Sequence[Union[_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any]]]]],_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any],ExecutableOption,Sequence[ExecutableOption],None])wrap_exceptions (
bool)kwargs (
Any)
- async cancel_task(task_id)[source]¶
Cancel a pending human task.
- Parameters:
task_id (
UUID) – The task ID.- Return type:
- Returns:
The updated task or None if not found.
- async complete_task(task_id, completed_by)[source]¶
Mark a human task as completed.
- Parameters:
- Return type:
- Returns:
The updated task or None if not found.
- async find_by_instance(instance_id)[source]¶
Find all human tasks for an instance.
- Parameters:
instance_id (
UUID) – The workflow instance ID.- Return type:
- Returns:
List of human tasks.
- async find_overdue()[source]¶
Find overdue pending human tasks.
- Return type:
- Returns:
List of overdue human tasks.
- model_type¶
alias of
HumanTaskModel
- class litestar_workflows.db.PersistentExecutionEngine[source]¶
Bases:
objectExecution engine with database persistence.
This engine persists all workflow state to a database, enabling durability, recovery, and querying of workflow history.
- registry¶
The workflow registry for looking up definitions.
- session¶
SQLAlchemy async session for database operations.
- event_bus¶
Optional event bus for emitting workflow events.
- Parameters:
registry (
WorkflowRegistry)session (
AsyncSession)
- __init__(registry, session, event_bus=None)[source]¶
Initialize the persistent execution engine.
- Parameters:
registry (
WorkflowRegistry) – The workflow registry.session (
AsyncSession) – SQLAlchemy async session.
- async complete_human_task(instance_id, step_name, user_id, data)[source]¶
Complete a human task with user-provided data.
- class litestar_workflows.db.StepExecutionModel[source]¶
Bases:
UUIDAuditBaseRecord of a single step execution within a workflow instance.
Tracks the execution of each step including timing, status, and input/output data for debugging and audit purposes.
- instance_id¶
Foreign key to the workflow instance.
- step_name¶
Name of the executed step.
- step_type¶
Type of step (MACHINE, HUMAN, etc.).
- status¶
Execution status of the step.
- input_data¶
Input data passed to the step.
- output_data¶
Output data produced by the step.
- error¶
Error message if step failed.
- started_at¶
Timestamp when step execution began.
- completed_at¶
Timestamp when step execution finished.
- assigned_to¶
User ID assigned to human tasks.
- completed_by¶
User ID who completed human tasks.
- __init__(**kwargs)¶
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
- assigned_to: Mapped[str | None]¶
- completed_at: Mapped[datetime | None]¶
- completed_by: Mapped[str | None]¶
- created_at: Mapped[datetime.datetime]¶
Date/time of instance creation.
- error: Mapped[str | None]¶
- id: Mapped[UUID]¶
UUID Primary key column.
- input_data: Mapped[dict[str, Any] | None]¶
- instance: Mapped[WorkflowInstanceModel]¶
- instance_id: Mapped[UUID]¶
- output_data: Mapped[dict[str, Any] | None]¶
- started_at: Mapped[datetime]¶
- status: Mapped[StepStatus]¶
- step_name: Mapped[str]¶
- step_type: Mapped[StepType]¶
- updated_at: Mapped[datetime.datetime]¶
Date/time of instance last update.
- class litestar_workflows.db.StepExecutionRepository[source]¶
Bases:
SQLAlchemyAsyncRepository[StepExecutionModel]Repository for step execution record CRUD operations.
- Parameters:
session (
Union[AsyncSession,async_scoped_session[AsyncSession]])auto_expunge (
bool)auto_refresh (
bool)auto_commit (
bool)order_by (
Union[list[Union[tuple[Union[str,InstrumentedAttribute[Any]],bool],UnaryExpression[Any]]],tuple[Union[str,InstrumentedAttribute[Any]],bool],UnaryExpression[Any],None])load (
Union[Sequence[Union[_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any],Sequence[Union[_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any]]]]],_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any],ExecutableOption,Sequence[ExecutableOption],None])wrap_exceptions (
bool)kwargs (
Any)
- async find_by_instance(instance_id)[source]¶
Find all step executions for an instance.
- Parameters:
instance_id (
UUID) – The workflow instance ID.- Return type:
- Returns:
List of step executions ordered by start time.
- async find_by_step_name(instance_id, step_name)[source]¶
Find the execution record for a specific step.
- Parameters:
- Return type:
- Returns:
The step execution or None.
- async find_failed(instance_id=None)[source]¶
Find failed step executions.
- Parameters:
- Return type:
- Returns:
List of failed step executions.
- model_type¶
alias of
StepExecutionModel
- class litestar_workflows.db.WorkflowDefinitionModel[source]¶
Bases:
UUIDAuditBasePersisted workflow definition for versioning and storage.
Stores the serialized workflow definition along with metadata for querying and managing workflow versions.
- name¶
Unique name identifier for the workflow.
- version¶
Semantic version string (e.g., “1.0.0”).
- description¶
Human-readable description of the workflow.
- definition_json¶
Serialized WorkflowDefinition as JSON.
- is_active¶
Whether this version is active for new instances.
- instances¶
Related workflow instances.
- __init__(**kwargs)¶
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
- created_at: Mapped[datetime.datetime]¶
Date/time of instance creation.
- definition_json: Mapped[dict[str, Any]]¶
- description: Mapped[str | None]¶
- id: Mapped[UUID]¶
UUID Primary key column.
- instances: Mapped[list[WorkflowInstanceModel]]¶
- is_active: Mapped[bool]¶
- name: Mapped[str]¶
- updated_at: Mapped[datetime.datetime]¶
Date/time of instance last update.
- version: Mapped[str]¶
- class litestar_workflows.db.WorkflowDefinitionRepository[source]¶
Bases:
SQLAlchemyAsyncRepository[WorkflowDefinitionModel]Repository for workflow definition CRUD operations.
Provides methods for managing workflow definitions including version management and activation status.
- Parameters:
session (
Union[AsyncSession,async_scoped_session[AsyncSession]])auto_expunge (
bool)auto_refresh (
bool)auto_commit (
bool)order_by (
Union[list[Union[tuple[Union[str,InstrumentedAttribute[Any]],bool],UnaryExpression[Any]]],tuple[Union[str,InstrumentedAttribute[Any]],bool],UnaryExpression[Any],None])load (
Union[Sequence[Union[_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any],Sequence[Union[_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any]]]]],_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any],ExecutableOption,Sequence[ExecutableOption],None])wrap_exceptions (
bool)kwargs (
Any)
- async get_by_name(name, version=None, *, active_only=True)[source]¶
Get a workflow definition by name and optional version.
- async get_latest_version(name)[source]¶
Get the latest active version of a workflow definition.
- Parameters:
name (
str) – The workflow name.- Return type:
- Returns:
The latest active workflow definition or None.
- async list_active()[source]¶
List all active workflow definitions.
- Return type:
- Returns:
List of active workflow definitions.
- model_type¶
alias of
WorkflowDefinitionModel
- class litestar_workflows.db.WorkflowInstanceModel[source]¶
Bases:
UUIDAuditBasePersisted workflow instance representing a running or completed execution.
Stores the current state of a workflow execution including context data, current step, and execution history.
- definition_id¶
Foreign key to the workflow definition.
- workflow_name¶
Denormalized workflow name for quick queries.
- workflow_version¶
Denormalized workflow version.
- status¶
Current execution status.
- current_step¶
Name of the currently executing step (None if complete).
- context_data¶
Mutable workflow context data as JSON.
- metadata¶
Immutable metadata about the execution.
- error¶
Error message if workflow failed.
- started_at¶
Timestamp when execution began.
- completed_at¶
Timestamp when execution finished.
- tenant_id¶
Optional tenant identifier for multi-tenancy.
- created_by¶
Optional user who started the workflow.
- __init__(**kwargs)¶
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
- completed_at: Mapped[datetime | None]¶
- context_data: Mapped[dict[str, Any]]¶
- created_at: Mapped[datetime.datetime]¶
Date/time of instance creation.
- created_by: Mapped[str | None]¶
- current_step: Mapped[str | None]¶
- definition: Mapped[WorkflowDefinitionModel]¶
- definition_id: Mapped[UUID]¶
- error: Mapped[str | None]¶
- human_tasks: Mapped[list[HumanTaskModel]]¶
- id: Mapped[UUID]¶
UUID Primary key column.
- metadata_: Mapped[dict[str, Any]]¶
- started_at: Mapped[datetime]¶
- status: Mapped[WorkflowStatus]¶
- step_executions: Mapped[list[StepExecutionModel]]¶
- tenant_id: Mapped[str | None]¶
- updated_at: Mapped[datetime.datetime]¶
Date/time of instance last update.
- workflow_name: Mapped[str]¶
- workflow_version: Mapped[str]¶
- class litestar_workflows.db.WorkflowInstanceRepository[source]¶
Bases:
SQLAlchemyAsyncRepository[WorkflowInstanceModel]Repository for workflow instance CRUD operations.
Provides methods for querying and managing workflow instances including filtering by status, user, and workflow name.
- Parameters:
session (
Union[AsyncSession,async_scoped_session[AsyncSession]])auto_expunge (
bool)auto_refresh (
bool)auto_commit (
bool)order_by (
Union[list[Union[tuple[Union[str,InstrumentedAttribute[Any]],bool],UnaryExpression[Any]]],tuple[Union[str,InstrumentedAttribute[Any]],bool],UnaryExpression[Any],None])load (
Union[Sequence[Union[_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any],Sequence[Union[_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any]]]]],_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any],ExecutableOption,Sequence[ExecutableOption],None])wrap_exceptions (
bool)kwargs (
Any)
- async find_by_tenant(tenant_id, status=None, limit=100, offset=0)[source]¶
Find instances by tenant ID.
- Parameters:
tenant_id (
str) – The tenant ID to filter by.status (
WorkflowStatus|None) – Optional status filter.limit (
int) – Maximum number of results.offset (
int) – Number of results to skip.
- Return type:
- Returns:
Tuple of (instances, total_count).
- async find_by_user(user_id, status=None)[source]¶
Find instances created by a specific user.
- Parameters:
user_id (
str) – The user ID to filter by.status (
WorkflowStatus|None) – Optional status filter.
- Return type:
- Returns:
List of workflow instances.
- async find_by_workflow(workflow_name, status=None, limit=100, offset=0)[source]¶
Find instances by workflow name with optional status filter.
- Parameters:
workflow_name (
str) – The workflow name to filter by.status (
WorkflowStatus|None) – Optional status filter.limit (
int) – Maximum number of results.offset (
int) – Number of results to skip.
- Return type:
- Returns:
Tuple of (instances, total_count).
- async find_running()[source]¶
Find all running or waiting workflow instances.
- Return type:
- Returns:
List of active workflow instances.
- model_type¶
alias of
WorkflowInstanceModel
SQLAlchemy models for workflow persistence.
This module defines the database models for persisting workflow state: - WorkflowDefinitionModel: Stores workflow definition metadata and schema - WorkflowInstanceModel: Stores running/completed workflow instances - StepExecutionModel: Records individual step executions - HumanTaskModel: Tracks pending human approval tasks
- class litestar_workflows.db.models.HumanTaskModel[source]¶
Bases:
UUIDAuditBasePending human task for quick querying and assignment.
Provides a denormalized view of pending human approval tasks for efficient querying by assignee, due date, and status.
- instance_id¶
Foreign key to the workflow instance.
- step_execution_id¶
Foreign key to the step execution.
- step_name¶
Name of the human task step.
- title¶
Display title for the task.
- description¶
Detailed description of the task.
- form_schema¶
JSON Schema defining the task form.
- assignee_id¶
User ID assigned to complete the task.
- assignee_group¶
Group/role that can complete the task.
- due_at¶
Deadline for task completion.
- reminder_at¶
When to send a reminder.
- status¶
Current task status (PENDING, COMPLETED, CANCELED).
- completed_at¶
When the task was completed.
- completed_by¶
User who completed the task.
- instance_id: Mapped[UUID]¶
- step_execution_id: Mapped[UUID]¶
- step_name: Mapped[str]¶
- title: Mapped[str]¶
- description: Mapped[str | None]¶
- form_schema: Mapped[dict[str, Any] | None]¶
- assignee_id: Mapped[str | None]¶
- assignee_group: Mapped[str | None]¶
- due_at: Mapped[datetime | None]¶
- reminder_at: Mapped[datetime | None]¶
- status: Mapped[str]¶
- completed_at: Mapped[datetime | None]¶
- completed_by: Mapped[str | None]¶
- instance: Mapped[WorkflowInstanceModel]¶
- __init__(**kwargs)¶
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
- created_at: Mapped[datetime.datetime]¶
Date/time of instance creation.
- id: Mapped[UUID]¶
UUID Primary key column.
- updated_at: Mapped[datetime.datetime]¶
Date/time of instance last update.
- class litestar_workflows.db.models.StepExecutionModel[source]¶
Bases:
UUIDAuditBaseRecord of a single step execution within a workflow instance.
Tracks the execution of each step including timing, status, and input/output data for debugging and audit purposes.
- instance_id¶
Foreign key to the workflow instance.
- step_name¶
Name of the executed step.
- step_type¶
Type of step (MACHINE, HUMAN, etc.).
- status¶
Execution status of the step.
- input_data¶
Input data passed to the step.
- output_data¶
Output data produced by the step.
- error¶
Error message if step failed.
- started_at¶
Timestamp when step execution began.
- completed_at¶
Timestamp when step execution finished.
- assigned_to¶
User ID assigned to human tasks.
- completed_by¶
User ID who completed human tasks.
- instance_id: Mapped[UUID]¶
- step_name: Mapped[str]¶
- step_type: Mapped[StepType]¶
- status: Mapped[StepStatus]¶
- input_data: Mapped[dict[str, Any] | None]¶
- output_data: Mapped[dict[str, Any] | None]¶
- error: Mapped[str | None]¶
- started_at: Mapped[datetime]¶
- completed_at: Mapped[datetime | None]¶
- assigned_to: Mapped[str | None]¶
- completed_by: Mapped[str | None]¶
- instance: Mapped[WorkflowInstanceModel]¶
- __init__(**kwargs)¶
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
- created_at: Mapped[datetime.datetime]¶
Date/time of instance creation.
- id: Mapped[UUID]¶
UUID Primary key column.
- updated_at: Mapped[datetime.datetime]¶
Date/time of instance last update.
- class litestar_workflows.db.models.WorkflowDefinitionModel[source]¶
Bases:
UUIDAuditBasePersisted workflow definition for versioning and storage.
Stores the serialized workflow definition along with metadata for querying and managing workflow versions.
- name¶
Unique name identifier for the workflow.
- version¶
Semantic version string (e.g., “1.0.0”).
- description¶
Human-readable description of the workflow.
- definition_json¶
Serialized WorkflowDefinition as JSON.
- is_active¶
Whether this version is active for new instances.
- instances¶
Related workflow instances.
- name: Mapped[str]¶
- version: Mapped[str]¶
- description: Mapped[str | None]¶
- definition_json: Mapped[dict[str, Any]]¶
- is_active: Mapped[bool]¶
- instances: Mapped[list[WorkflowInstanceModel]]¶
- __init__(**kwargs)¶
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
- created_at: Mapped[datetime.datetime]¶
Date/time of instance creation.
- id: Mapped[UUID]¶
UUID Primary key column.
- updated_at: Mapped[datetime.datetime]¶
Date/time of instance last update.
- class litestar_workflows.db.models.WorkflowInstanceModel[source]¶
Bases:
UUIDAuditBasePersisted workflow instance representing a running or completed execution.
Stores the current state of a workflow execution including context data, current step, and execution history.
- definition_id¶
Foreign key to the workflow definition.
- workflow_name¶
Denormalized workflow name for quick queries.
- workflow_version¶
Denormalized workflow version.
- status¶
Current execution status.
- current_step¶
Name of the currently executing step (None if complete).
- context_data¶
Mutable workflow context data as JSON.
- metadata¶
Immutable metadata about the execution.
- error¶
Error message if workflow failed.
- started_at¶
Timestamp when execution began.
- completed_at¶
Timestamp when execution finished.
- tenant_id¶
Optional tenant identifier for multi-tenancy.
- created_by¶
Optional user who started the workflow.
- definition_id: Mapped[UUID]¶
- workflow_name: Mapped[str]¶
- workflow_version: Mapped[str]¶
- status: Mapped[WorkflowStatus]¶
- current_step: Mapped[str | None]¶
- context_data: Mapped[dict[str, Any]]¶
- metadata_: Mapped[dict[str, Any]]¶
- error: Mapped[str | None]¶
- started_at: Mapped[datetime]¶
- completed_at: Mapped[datetime | None]¶
- tenant_id: Mapped[str | None]¶
- created_by: Mapped[str | None]¶
- definition: Mapped[WorkflowDefinitionModel]¶
- step_executions: Mapped[list[StepExecutionModel]]¶
- human_tasks: Mapped[list[HumanTaskModel]]¶
- __init__(**kwargs)¶
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
- created_at: Mapped[datetime.datetime]¶
Date/time of instance creation.
- id: Mapped[UUID]¶
UUID Primary key column.
- updated_at: Mapped[datetime.datetime]¶
Date/time of instance last update.
Repository implementations for workflow persistence.
This module provides async repositories for CRUD operations on workflow models using advanced-alchemy’s repository pattern.
- class litestar_workflows.db.repositories.HumanTaskRepository[source]¶
Bases:
SQLAlchemyAsyncRepository[HumanTaskModel]Repository for human task CRUD operations.
Provides methods for querying pending human tasks by assignee, group, and due date.
- Parameters:
session (
Union[AsyncSession,async_scoped_session[AsyncSession]])auto_expunge (
bool)auto_refresh (
bool)auto_commit (
bool)order_by (
Union[list[Union[tuple[Union[str,InstrumentedAttribute[Any]],bool],UnaryExpression[Any]]],tuple[Union[str,InstrumentedAttribute[Any]],bool],UnaryExpression[Any],None])load (
Union[Sequence[Union[_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any],Sequence[Union[_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any]]]]],_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any],ExecutableOption,Sequence[ExecutableOption],None])wrap_exceptions (
bool)kwargs (
Any)
- model_type¶
alias of
HumanTaskModel
- async find_by_instance(instance_id)[source]¶
Find all human tasks for an instance.
- Parameters:
instance_id (
UUID) – The workflow instance ID.- Return type:
- Returns:
List of human tasks.
- async find_overdue()[source]¶
Find overdue pending human tasks.
- Return type:
- Returns:
List of overdue human tasks.
- async complete_task(task_id, completed_by)[source]¶
Mark a human task as completed.
- Parameters:
- Return type:
- Returns:
The updated task or None if not found.
- class litestar_workflows.db.repositories.StepExecutionRepository[source]¶
Bases:
SQLAlchemyAsyncRepository[StepExecutionModel]Repository for step execution record CRUD operations.
- Parameters:
session (
Union[AsyncSession,async_scoped_session[AsyncSession]])auto_expunge (
bool)auto_refresh (
bool)auto_commit (
bool)order_by (
Union[list[Union[tuple[Union[str,InstrumentedAttribute[Any]],bool],UnaryExpression[Any]]],tuple[Union[str,InstrumentedAttribute[Any]],bool],UnaryExpression[Any],None])load (
Union[Sequence[Union[_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any],Sequence[Union[_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any]]]]],_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any],ExecutableOption,Sequence[ExecutableOption],None])wrap_exceptions (
bool)kwargs (
Any)
- model_type¶
alias of
StepExecutionModel
- async find_by_instance(instance_id)[source]¶
Find all step executions for an instance.
- Parameters:
instance_id (
UUID) – The workflow instance ID.- Return type:
- Returns:
List of step executions ordered by start time.
- class litestar_workflows.db.repositories.WorkflowDefinitionRepository[source]¶
Bases:
SQLAlchemyAsyncRepository[WorkflowDefinitionModel]Repository for workflow definition CRUD operations.
Provides methods for managing workflow definitions including version management and activation status.
- Parameters:
session (
Union[AsyncSession,async_scoped_session[AsyncSession]])auto_expunge (
bool)auto_refresh (
bool)auto_commit (
bool)order_by (
Union[list[Union[tuple[Union[str,InstrumentedAttribute[Any]],bool],UnaryExpression[Any]]],tuple[Union[str,InstrumentedAttribute[Any]],bool],UnaryExpression[Any],None])load (
Union[Sequence[Union[_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any],Sequence[Union[_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any]]]]],_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any],ExecutableOption,Sequence[ExecutableOption],None])wrap_exceptions (
bool)kwargs (
Any)
- model_type¶
alias of
WorkflowDefinitionModel
- async get_by_name(name, version=None, *, active_only=True)[source]¶
Get a workflow definition by name and optional version.
- async get_latest_version(name)[source]¶
Get the latest active version of a workflow definition.
- Parameters:
name (
str) – The workflow name.- Return type:
- Returns:
The latest active workflow definition or None.
- class litestar_workflows.db.repositories.WorkflowInstanceRepository[source]¶
Bases:
SQLAlchemyAsyncRepository[WorkflowInstanceModel]Repository for workflow instance CRUD operations.
Provides methods for querying and managing workflow instances including filtering by status, user, and workflow name.
- Parameters:
session (
Union[AsyncSession,async_scoped_session[AsyncSession]])auto_expunge (
bool)auto_refresh (
bool)auto_commit (
bool)order_by (
Union[list[Union[tuple[Union[str,InstrumentedAttribute[Any]],bool],UnaryExpression[Any]]],tuple[Union[str,InstrumentedAttribute[Any]],bool],UnaryExpression[Any],None])load (
Union[Sequence[Union[_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any],Sequence[Union[_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any]]]]],_AbstractLoad,Literal['*'],InstrumentedAttribute[Any],RelationshipProperty[Any],MapperProperty[Any],ExecutableOption,Sequence[ExecutableOption],None])wrap_exceptions (
bool)kwargs (
Any)
- model_type¶
alias of
WorkflowInstanceModel
- async find_by_workflow(workflow_name, status=None, limit=100, offset=0)[source]¶
Find instances by workflow name with optional status filter.
- Parameters:
workflow_name (
str) – The workflow name to filter by.status (
WorkflowStatus|None) – Optional status filter.limit (
int) – Maximum number of results.offset (
int) – Number of results to skip.
- Return type:
- Returns:
Tuple of (instances, total_count).
- async find_by_user(user_id, status=None)[source]¶
Find instances created by a specific user.
- Parameters:
user_id (
str) – The user ID to filter by.status (
WorkflowStatus|None) – Optional status filter.
- Return type:
- Returns:
List of workflow instances.
- async find_by_tenant(tenant_id, status=None, limit=100, offset=0)[source]¶
Find instances by tenant ID.
- Parameters:
tenant_id (
str) – The tenant ID to filter by.status (
WorkflowStatus|None) – Optional status filter.limit (
int) – Maximum number of results.offset (
int) – Number of results to skip.
- Return type:
- Returns:
Tuple of (instances, total_count).
- async find_running()[source]¶
Find all running or waiting workflow instances.
- Return type:
- Returns:
List of active workflow instances.
Persistent execution engine with database storage.
This module provides a persistence-aware execution engine that stores workflow state in a database using SQLAlchemy.
- class litestar_workflows.db.engine.PersistentExecutionEngine[source]¶
Bases:
objectExecution engine with database persistence.
This engine persists all workflow state to a database, enabling durability, recovery, and querying of workflow history.
- registry¶
The workflow registry for looking up definitions.
- session¶
SQLAlchemy async session for database operations.
- event_bus¶
Optional event bus for emitting workflow events.
- Parameters:
registry (
WorkflowRegistry)session (
AsyncSession)
- __init__(registry, session, event_bus=None)[source]¶
Initialize the persistent execution engine.
- Parameters:
registry (
WorkflowRegistry) – The workflow registry.session (
AsyncSession) – SQLAlchemy async session.
- async start_workflow(workflow, initial_data=None, *, tenant_id=None, created_by=None)[source]¶
Start a new workflow instance with persistence.
- Parameters:
- Return type:
WorkflowInstanceData- Returns:
The created WorkflowInstanceData.
- async complete_human_task(instance_id, step_name, user_id, data)[source]¶
Complete a human task with user-provided data.
Database API¶
from litestar_workflows.db import (
WorkflowDefinitionModel,
WorkflowInstanceModel,
StepExecutionModel,
HumanTaskModel,
WorkflowInstanceRepository,
)
Web Module¶
The web module provides REST API controllers and DTOs. The REST API is built into
the main WorkflowPlugin and enabled by default.
Web plugin for litestar-workflows.
This module provides REST API controllers and HTML UI views for managing workflows through HTTP endpoints. The REST API is automatically enabled when using WorkflowPlugin with enable_api=True (the default).
The API includes controllers for workflow definitions, instances, and human tasks, along with graph visualization utilities.
The UI views (optional, requires [ui] extra) provide: - Dashboard with workflow statistics - Workflow definition list and detail views - Instance list and detail views with MermaidJS graphs - Human task forms with JSON Schema rendering
Example
Basic usage with WorkflowPlugin (API enabled by default):
from litestar import Litestar
from litestar_workflows import WorkflowPlugin, WorkflowPluginConfig
app = Litestar(
plugins=[
WorkflowPlugin(
config=WorkflowPluginConfig(
enable_api=True, # Default
api_path_prefix="/workflows",
)
),
],
)
With UI enabled (requires [ui] extra):
from litestar import Litestar
from litestar_workflows import WorkflowPlugin, WorkflowPluginConfig
from litestar_workflows.web.views import WorkflowUIController, get_template_config
app = Litestar(
plugins=[WorkflowPlugin(config=WorkflowPluginConfig())],
route_handlers=[WorkflowUIController],
template_config=get_template_config(),
)
With authentication guards:
from litestar_workflows import WorkflowPluginConfig
config = WorkflowPluginConfig(
api_path_prefix="/api/v1/workflows",
api_guards=[require_auth_guard],
)
app = Litestar(
plugins=[WorkflowPlugin(config=config)],
)
Disable API endpoints:
app = Litestar(
plugins=[
WorkflowPlugin(config=WorkflowPluginConfig(enable_api=False)),
],
)
- class litestar_workflows.web.CompleteTaskDTO[source]¶
Bases:
objectDTO for completing a human task.
- output_data¶
Data submitted by the user completing the task.
- completed_by¶
User ID who completed the task.
- comment¶
Optional comment about the completion.
- __init__(output_data, completed_by, comment=None)¶
- exception litestar_workflows.web.DatabaseRequiredError[source]¶
Bases:
ExceptionRaised when an endpoint requires database persistence but it’s not available.
This exception is raised when a user tries to access an endpoint that requires the [db] extra but it hasn’t been installed.
- class litestar_workflows.web.GraphDTO[source]¶
Bases:
objectDTO for workflow graph visualization.
- mermaid_source¶
MermaidJS graph definition.
- nodes¶
List of node definitions.
- edges¶
List of edge definitions.
- __init__(mermaid_source, nodes, edges)¶
- class litestar_workflows.web.HumanTaskController[source]¶
Bases:
ControllerAPI controller for human tasks.
Provides endpoints for managing human approval tasks including listing, completing, and reassigning tasks.
Tags: Human Tasks
- Parameters:
owner (
Router)
- after_request: AfterRequestHookHandler | None¶
A sync or async function executed before a
Requestis passed to any route handler.If this function returns a value, the request will not reach the route handler, and instead this value will be used.
- after_response: AfterResponseHookHandler | None¶
A sync or async function called after the response has been awaited.
It receives the
Requestinstance and should not return any values.
- before_request: BeforeRequestHookHandler | None¶
A sync or async function called immediately before calling the route handler.
It receives the
Requestinstance and any non-Nonereturn value is used for the response, bypassing the route handler.
- cache_control: CacheControlHeader | None¶
A
CacheControlHeaderheader to add to route handlers of this controller.Can be overridden by route handlers.
- dependencies: Dependencies | None¶
A string keyed dictionary of dependency
Providerinstances.
- dto: type[AbstractDTO] | None | EmptyType¶
AbstractDTOto use for (de)serializing and validation of request data.
- etag: ETag | None¶
An
etagheader of typeETagto add to route handlers of this controller.Can be overridden by route handlers.
- exception_handlers: ExceptionHandlersMap | None¶
A map of handler functions to status codes and/or exception types.
- guards: Sequence[Guard] | None¶
A sequence of
Guardcallables.
- include_in_schema: bool | EmptyType¶
A boolean flag dictating whether the route handler should be documented in the OpenAPI schema
- middleware: Sequence[Middleware] | None¶
A sequence of
Middleware.
- opt: Mapping[str, Any] | None¶
A string key mapping of arbitrary values that can be accessed in
Guardsor wherever you have access toRequestorASGI Scope.
- owner: Router¶
The
RouterorLitestarapp that owns the controller.This value is set internally by Litestar and it should not be set when subclassing the controller.
- parameters: ParametersMap | None¶
A mapping of
Parameterdefinitions available to all application paths.
- path: str¶
A path fragment for the controller.
All route handlers under the controller will have the fragment appended to them. If not set it defaults to
/.
- request_class: type[Request] | None¶
A custom subclass of
Requestto be used as the default request for all route handlers under the controller.
- request_max_body_size: int | None | EmptyType¶
Maximum allowed size of the request body in bytes. If this size is exceeded, a ‘413 - Request Entity Too Large’ error response is returned.
- response_class: type[Response] | None¶
A custom subclass of
Responseto be used as the default response for all route handlers under the controller.
- response_cookies: ResponseCookies | None¶
A list of
Cookieinstances.
- response_headers: ResponseHeaders | None¶
A string keyed dictionary mapping
ResponseHeaderinstances.
- return_dto: type[AbstractDTO] | None | EmptyType¶
AbstractDTOto use for serializing outbound response data.
- security: Sequence[SecurityRequirement] | None¶
A sequence of dictionaries that to the schema of all route handlers under the controller.
- signature_namespace: dict[str, Any]¶
A mapping of names to types for use in forward reference resolution during signature modelling.
- signature_types: Sequence[Any]¶
A sequence of types for use in forward reference resolution during signature modelling.
These types will be added to the signature namespace using their
__name__attribute.
- tags: ClassVar[list[str]]¶
A sequence of string tags that will be appended to the schema of all route handlers under the controller.
- type_decoders: TypeDecodersSequence | None¶
A sequence of tuples, each composed of a predicate testing for type identity and a msgspec hook for deserialization.
- type_encoders: TypeEncodersMap | None¶
A mapping of types to callables that transform them into types supported for serialization.
- websocket_class: type[WebSocket] | None¶
A custom subclass of
WebSocketto be used as the default websocket for all route handlers under the controller.
- complete_task(fn) = <litestar.handlers.http_handlers.decorators.post object>¶
- get_task(fn) = <litestar.handlers.http_handlers.decorators.get object>¶
- list_tasks(fn) = <litestar.handlers.http_handlers.decorators.get object>¶
- reassign_task(fn) = <litestar.handlers.http_handlers.decorators.post object>¶
- class litestar_workflows.web.HumanTaskDTO[source]¶
Bases:
objectDTO for human task summary.
- id¶
Task ID.
- instance_id¶
Workflow instance ID.
- step_name¶
Name of the human task step.
- title¶
Display title for the task.
- description¶
Detailed task description.
- assignee¶
User ID assigned to complete the task.
- status¶
Task status (pending, completed, canceled).
- due_date¶
Optional due date for task completion.
- created_at¶
When the task was created.
- form_schema¶
Optional JSON Schema for task form.
- Parameters:
- __init__(id, instance_id, step_name, title, description, assignee, status, due_date, created_at, form_schema=None)¶
- class litestar_workflows.web.ReassignTaskDTO[source]¶
Bases:
objectDTO for reassigning a human task.
- new_assignee¶
User ID to assign the task to.
- reason¶
Optional reason for reassignment.
- class litestar_workflows.web.StartWorkflowDTO[source]¶
Bases:
objectDTO for starting a new workflow instance.
- definition_name¶
Name of the workflow definition to instantiate.
- input_data¶
Initial data to pass to the workflow context.
- correlation_id¶
Optional correlation ID for tracking related workflows.
- user_id¶
Optional user ID who started the workflow.
- tenant_id¶
Optional tenant ID for multi-tenancy.
- Parameters:
- __init__(definition_name, input_data=None, correlation_id=None, user_id=None, tenant_id=None)¶
- class litestar_workflows.web.StepExecutionDTO[source]¶
Bases:
objectDTO for step execution record.
- id¶
Step execution ID.
- step_name¶
Name of the executed step.
- status¶
Execution status (PENDING, RUNNING, SUCCEEDED, etc.).
- started_at¶
When execution started.
- completed_at¶
When execution completed (if finished).
- error¶
Error message if execution failed.
- Parameters:
- __init__(id, step_name, status, started_at, completed_at=None, error=None)¶
- class litestar_workflows.web.WorkflowDefinitionController[source]¶
Bases:
ControllerAPI controller for workflow definitions.
Provides endpoints for listing and retrieving workflow definitions, including their schemas and graph visualizations.
Tags: Workflow Definitions
- Parameters:
owner (
Router)
- after_request: AfterRequestHookHandler | None¶
A sync or async function executed before a
Requestis passed to any route handler.If this function returns a value, the request will not reach the route handler, and instead this value will be used.
- after_response: AfterResponseHookHandler | None¶
A sync or async function called after the response has been awaited.
It receives the
Requestinstance and should not return any values.
- before_request: BeforeRequestHookHandler | None¶
A sync or async function called immediately before calling the route handler.
It receives the
Requestinstance and any non-Nonereturn value is used for the response, bypassing the route handler.
- cache_control: CacheControlHeader | None¶
A
CacheControlHeaderheader to add to route handlers of this controller.Can be overridden by route handlers.
- dependencies: Dependencies | None¶
A string keyed dictionary of dependency
Providerinstances.
- dto: type[AbstractDTO] | None | EmptyType¶
AbstractDTOto use for (de)serializing and validation of request data.
- etag: ETag | None¶
An
etagheader of typeETagto add to route handlers of this controller.Can be overridden by route handlers.
- exception_handlers: ExceptionHandlersMap | None¶
A map of handler functions to status codes and/or exception types.
- guards: Sequence[Guard] | None¶
A sequence of
Guardcallables.
- include_in_schema: bool | EmptyType¶
A boolean flag dictating whether the route handler should be documented in the OpenAPI schema
- middleware: Sequence[Middleware] | None¶
A sequence of
Middleware.
- opt: Mapping[str, Any] | None¶
A string key mapping of arbitrary values that can be accessed in
Guardsor wherever you have access toRequestorASGI Scope.
- owner: Router¶
The
RouterorLitestarapp that owns the controller.This value is set internally by Litestar and it should not be set when subclassing the controller.
- parameters: ParametersMap | None¶
A mapping of
Parameterdefinitions available to all application paths.
- path: str¶
A path fragment for the controller.
All route handlers under the controller will have the fragment appended to them. If not set it defaults to
/.
- request_class: type[Request] | None¶
A custom subclass of
Requestto be used as the default request for all route handlers under the controller.
- request_max_body_size: int | None | EmptyType¶
Maximum allowed size of the request body in bytes. If this size is exceeded, a ‘413 - Request Entity Too Large’ error response is returned.
- response_class: type[Response] | None¶
A custom subclass of
Responseto be used as the default response for all route handlers under the controller.
- response_cookies: ResponseCookies | None¶
A list of
Cookieinstances.
- response_headers: ResponseHeaders | None¶
A string keyed dictionary mapping
ResponseHeaderinstances.
- return_dto: type[AbstractDTO] | None | EmptyType¶
AbstractDTOto use for serializing outbound response data.
- security: Sequence[SecurityRequirement] | None¶
A sequence of dictionaries that to the schema of all route handlers under the controller.
- signature_namespace: dict[str, Any]¶
A mapping of names to types for use in forward reference resolution during signature modelling.
- signature_types: Sequence[Any]¶
A sequence of types for use in forward reference resolution during signature modelling.
These types will be added to the signature namespace using their
__name__attribute.
- tags: ClassVar[list[str]]¶
A sequence of string tags that will be appended to the schema of all route handlers under the controller.
- type_decoders: TypeDecodersSequence | None¶
A sequence of tuples, each composed of a predicate testing for type identity and a msgspec hook for deserialization.
- type_encoders: TypeEncodersMap | None¶
A mapping of types to callables that transform them into types supported for serialization.
- websocket_class: type[WebSocket] | None¶
A custom subclass of
WebSocketto be used as the default websocket for all route handlers under the controller.
- get_definition(fn) = <litestar.handlers.http_handlers.decorators.get object>¶
- get_definition_graph(fn) = <litestar.handlers.http_handlers.decorators.get object>¶
- list_definitions(fn) = <litestar.handlers.http_handlers.decorators.get object>¶
- class litestar_workflows.web.WorkflowDefinitionDTO[source]¶
Bases:
objectDTO for workflow definition metadata.
- name¶
Workflow name.
- version¶
Workflow version.
- description¶
Human-readable description.
- steps¶
List of step names in the workflow.
- edges¶
List of edge definitions (source, target, condition).
- initial_step¶
Name of the starting step.
- terminal_steps¶
List of terminal step names.
- Parameters:
- __init__(name, version, description, steps, edges, initial_step, terminal_steps)¶
- class litestar_workflows.web.WorkflowInstanceController[source]¶
Bases:
ControllerAPI controller for workflow instances.
Provides endpoints for starting, listing, and managing workflow instance executions.
Tags: Workflow Instances
- Parameters:
owner (
Router)
- after_request: AfterRequestHookHandler | None¶
A sync or async function executed before a
Requestis passed to any route handler.If this function returns a value, the request will not reach the route handler, and instead this value will be used.
- after_response: AfterResponseHookHandler | None¶
A sync or async function called after the response has been awaited.
It receives the
Requestinstance and should not return any values.
- before_request: BeforeRequestHookHandler | None¶
A sync or async function called immediately before calling the route handler.
It receives the
Requestinstance and any non-Nonereturn value is used for the response, bypassing the route handler.
- cache_control: CacheControlHeader | None¶
A
CacheControlHeaderheader to add to route handlers of this controller.Can be overridden by route handlers.
- dependencies: Dependencies | None¶
A string keyed dictionary of dependency
Providerinstances.
- dto: type[AbstractDTO] | None | EmptyType¶
AbstractDTOto use for (de)serializing and validation of request data.
- etag: ETag | None¶
An
etagheader of typeETagto add to route handlers of this controller.Can be overridden by route handlers.
- exception_handlers: ExceptionHandlersMap | None¶
A map of handler functions to status codes and/or exception types.
- guards: Sequence[Guard] | None¶
A sequence of
Guardcallables.
- include_in_schema: bool | EmptyType¶
A boolean flag dictating whether the route handler should be documented in the OpenAPI schema
- middleware: Sequence[Middleware] | None¶
A sequence of
Middleware.
- opt: Mapping[str, Any] | None¶
A string key mapping of arbitrary values that can be accessed in
Guardsor wherever you have access toRequestorASGI Scope.
- owner: Router¶
The
RouterorLitestarapp that owns the controller.This value is set internally by Litestar and it should not be set when subclassing the controller.
- parameters: ParametersMap | None¶
A mapping of
Parameterdefinitions available to all application paths.
- path: str¶
A path fragment for the controller.
All route handlers under the controller will have the fragment appended to them. If not set it defaults to
/.
- request_class: type[Request] | None¶
A custom subclass of
Requestto be used as the default request for all route handlers under the controller.
- request_max_body_size: int | None | EmptyType¶
Maximum allowed size of the request body in bytes. If this size is exceeded, a ‘413 - Request Entity Too Large’ error response is returned.
- response_class: type[Response] | None¶
A custom subclass of
Responseto be used as the default response for all route handlers under the controller.
- response_cookies: ResponseCookies | None¶
A list of
Cookieinstances.
- response_headers: ResponseHeaders | None¶
A string keyed dictionary mapping
ResponseHeaderinstances.
- return_dto: type[AbstractDTO] | None | EmptyType¶
AbstractDTOto use for serializing outbound response data.
- security: Sequence[SecurityRequirement] | None¶
A sequence of dictionaries that to the schema of all route handlers under the controller.
- signature_namespace: dict[str, Any]¶
A mapping of names to types for use in forward reference resolution during signature modelling.
- signature_types: Sequence[Any]¶
A sequence of types for use in forward reference resolution during signature modelling.
These types will be added to the signature namespace using their
__name__attribute.
- tags: ClassVar[list[str]]¶
A sequence of string tags that will be appended to the schema of all route handlers under the controller.
- type_decoders: TypeDecodersSequence | None¶
A sequence of tuples, each composed of a predicate testing for type identity and a msgspec hook for deserialization.
- type_encoders: TypeEncodersMap | None¶
A mapping of types to callables that transform them into types supported for serialization.
- websocket_class: type[WebSocket] | None¶
A custom subclass of
WebSocketto be used as the default websocket for all route handlers under the controller.
- cancel_instance(fn) = <litestar.handlers.http_handlers.decorators.post object>¶
- get_instance(fn) = <litestar.handlers.http_handlers.decorators.get object>¶
- get_instance_graph(fn) = <litestar.handlers.http_handlers.decorators.get object>¶
- list_instances(fn) = <litestar.handlers.http_handlers.decorators.get object>¶
- retry_instance(fn) = <litestar.handlers.http_handlers.decorators.post object>¶
- start_workflow(fn) = <litestar.handlers.http_handlers.decorators.post object>¶
- class litestar_workflows.web.WorkflowInstanceDTO[source]¶
Bases:
objectDTO for workflow instance summary.
- id¶
Instance ID.
- definition_name¶
Name of the workflow definition.
- status¶
Current execution status.
- current_step¶
Currently executing step (if applicable).
- started_at¶
When the workflow started.
- completed_at¶
When the workflow completed (if finished).
- created_by¶
User who started the workflow.
- Parameters:
- __init__(id, definition_name, status, current_step, started_at, completed_at=None, created_by=None)¶
- class litestar_workflows.web.WorkflowInstanceDetailDTO[source]¶
Bases:
objectDTO for detailed workflow instance information.
Extends WorkflowInstanceDTO with full context and execution history.
- id¶
Instance ID.
- definition_name¶
Name of the workflow definition.
- status¶
Current execution status.
- current_step¶
Currently executing step (if applicable).
- started_at¶
When the workflow started.
- completed_at¶
When the workflow completed (if finished).
- created_by¶
User who started the workflow.
- context_data¶
Current workflow context data.
- metadata¶
Workflow metadata.
- step_history¶
List of step executions.
- error¶
Error message if workflow failed.
- Parameters:
- __init__(id, definition_name, status, current_step, started_at, completed_at, created_by, context_data, metadata, step_history, error=None)¶
-
step_history:
list[StepExecutionDTO]¶
- class litestar_workflows.web.WorkflowWebConfig[source]¶
Bases:
objectConfiguration for the workflow web plugin.
This dataclass defines all configurable options for the WorkflowWebPlugin, allowing users to customize the REST API endpoints, authentication, and feature availability.
- path_prefix¶
URL path prefix for all workflow endpoints.
- include_in_schema¶
Whether to include endpoints in OpenAPI schema.
- guards¶
List of Litestar guards to apply to all workflow endpoints.
- enable_graph_endpoints¶
Whether to enable graph visualization endpoints.
- tags¶
OpenAPI tags to apply to workflow endpoints.
Example
>>> from litestar_workflows.web import WorkflowWebConfig >>> config = WorkflowWebConfig( ... path_prefix="/api/v1/workflows", ... guards=[require_auth], ... enable_graph_endpoints=True, ... )
- Parameters:
path_prefix (str)
include_in_schema (bool)
guards (list[Guard])
enable_graph_endpoints (bool)
tags (list[str])
- __init__(path_prefix='/workflows', include_in_schema=True, guards=<factory>, enable_graph_endpoints=True, tags=<factory>)¶
- Parameters:
path_prefix (str)
include_in_schema (bool)
guards (list[Guard])
enable_graph_endpoints (bool)
tags (list[str])
- enable_graph_endpoints: bool = True¶
- include_in_schema: bool = True¶
- path_prefix: str = '/workflows'¶
- guards: list[Guard]¶
- tags: list[str]¶
- litestar_workflows.web.database_required_handler(_request, exc)[source]¶
Exception handler for DatabaseRequiredError.
Returns a 501 Not Implemented response with installation instructions.
- Parameters:
request – The Litestar request object.
exc (
DatabaseRequiredError) – The DatabaseRequiredError exception._request (
Request)
- Return type:
- Returns:
Response with error details and installation instructions.
- litestar_workflows.web.generate_mermaid_graph(definition)[source]¶
Generate a MermaidJS graph representation of a workflow definition.
This function creates a Mermaid flowchart diagram that visualizes the workflow’s steps and transitions. Different step types are represented with different node shapes.
- Parameters:
definition (
WorkflowDefinition) – The workflow definition to visualize.- Return type:
- Returns:
A MermaidJS flowchart definition as a string.
Example
>>> mermaid = generate_mermaid_graph(definition) >>> print(mermaid) graph TD submit[Submit] review{{Review}} approve[Approve] submit --> review review --> approve
- litestar_workflows.web.generate_mermaid_graph_with_state(definition, current_step=None, completed_steps=None, failed_steps=None)[source]¶
Generate a MermaidJS graph with execution state highlighting.
This function creates a Mermaid flowchart that includes visual styling to highlight the current step, completed steps, and failed steps.
- Parameters:
- Return type:
- Returns:
A MermaidJS flowchart definition with state styling.
Example
>>> mermaid = generate_mermaid_graph_with_state( ... definition, ... current_step="review", ... completed_steps=["submit"], ... failed_steps=[], ... )
- litestar_workflows.web.parse_graph_to_dict(definition)[source]¶
Parse a workflow definition into a dictionary representation.
This function extracts nodes and edges from a workflow definition into a structured dictionary format suitable for JSON serialization.
- Parameters:
definition (
WorkflowDefinition) – The workflow definition to parse.- Return type:
- Returns:
A dictionary containing nodes and edges lists.
Example
>>> graph_dict = parse_graph_to_dict(definition) >>> print(graph_dict["nodes"]) [{"id": "submit", "label": "Submit", "type": "machine"}, ...]
- litestar_workflows.web.require_db()[source]¶
Dependency that checks if database persistence is available.
This function is used as a dependency in route handlers that require database functionality. It will raise DatabaseRequiredError if the required database components are not available.
- Raises:
DatabaseRequiredError – If database components are not installed.
Example
```python from litestar import get from litestar.di import Provide
- @get(
“/instances”, dependencies={“_db_check”: Provide(require_db)},
) async def list_instances() -> list[dict]:
# This endpoint requires database to be installed …
- Return type:
REST API controllers for workflow management.
This module provides three controller classes for managing workflows: - WorkflowDefinitionController: Manage workflow definitions and schemas - WorkflowInstanceController: Start, monitor, and control workflow executions - HumanTaskController: Manage human approval tasks
- class litestar_workflows.web.controllers.HumanTaskController[source]¶
Bases:
ControllerAPI controller for human tasks.
Provides endpoints for managing human approval tasks including listing, completing, and reassigning tasks.
Tags: Human Tasks
- Parameters:
owner (
Router)
- path: str¶
A path fragment for the controller.
All route handlers under the controller will have the fragment appended to them. If not set it defaults to
/.
- tags: ClassVar[list[str]]¶
A sequence of string tags that will be appended to the schema of all route handlers under the controller.
- list_tasks(fn) = <litestar.handlers.http_handlers.decorators.get object>¶
- get_task(fn) = <litestar.handlers.http_handlers.decorators.get object>¶
- complete_task(fn) = <litestar.handlers.http_handlers.decorators.post object>¶
- reassign_task(fn) = <litestar.handlers.http_handlers.decorators.post object>¶
- after_request: AfterRequestHookHandler | None¶
A sync or async function executed before a
Requestis passed to any route handler.If this function returns a value, the request will not reach the route handler, and instead this value will be used.
- after_response: AfterResponseHookHandler | None¶
A sync or async function called after the response has been awaited.
It receives the
Requestinstance and should not return any values.
- before_request: BeforeRequestHookHandler | None¶
A sync or async function called immediately before calling the route handler.
It receives the
Requestinstance and any non-Nonereturn value is used for the response, bypassing the route handler.
- cache_control: CacheControlHeader | None¶
A
CacheControlHeaderheader to add to route handlers of this controller.Can be overridden by route handlers.
- dependencies: Dependencies | None¶
A string keyed dictionary of dependency
Providerinstances.
- dto: type[AbstractDTO] | None | EmptyType¶
AbstractDTOto use for (de)serializing and validation of request data.
- etag: ETag | None¶
An
etagheader of typeETagto add to route handlers of this controller.Can be overridden by route handlers.
- exception_handlers: ExceptionHandlersMap | None¶
A map of handler functions to status codes and/or exception types.
- guards: Sequence[Guard] | None¶
A sequence of
Guardcallables.
- include_in_schema: bool | EmptyType¶
A boolean flag dictating whether the route handler should be documented in the OpenAPI schema
- middleware: Sequence[Middleware] | None¶
A sequence of
Middleware.
- opt: Mapping[str, Any] | None¶
A string key mapping of arbitrary values that can be accessed in
Guardsor wherever you have access toRequestorASGI Scope.
- owner: Router¶
The
RouterorLitestarapp that owns the controller.This value is set internally by Litestar and it should not be set when subclassing the controller.
- parameters: ParametersMap | None¶
A mapping of
Parameterdefinitions available to all application paths.
- request_class: type[Request] | None¶
A custom subclass of
Requestto be used as the default request for all route handlers under the controller.
- request_max_body_size: int | None | EmptyType¶
Maximum allowed size of the request body in bytes. If this size is exceeded, a ‘413 - Request Entity Too Large’ error response is returned.
- response_class: type[Response] | None¶
A custom subclass of
Responseto be used as the default response for all route handlers under the controller.
- response_cookies: ResponseCookies | None¶
A list of
Cookieinstances.
- response_headers: ResponseHeaders | None¶
A string keyed dictionary mapping
ResponseHeaderinstances.
- return_dto: type[AbstractDTO] | None | EmptyType¶
AbstractDTOto use for serializing outbound response data.
- security: Sequence[SecurityRequirement] | None¶
A sequence of dictionaries that to the schema of all route handlers under the controller.
- signature_namespace: dict[str, Any]¶
A mapping of names to types for use in forward reference resolution during signature modelling.
- signature_types: Sequence[Any]¶
A sequence of types for use in forward reference resolution during signature modelling.
These types will be added to the signature namespace using their
__name__attribute.
- type_decoders: TypeDecodersSequence | None¶
A sequence of tuples, each composed of a predicate testing for type identity and a msgspec hook for deserialization.
- type_encoders: TypeEncodersMap | None¶
A mapping of types to callables that transform them into types supported for serialization.
- websocket_class: type[WebSocket] | None¶
A custom subclass of
WebSocketto be used as the default websocket for all route handlers under the controller.
- class litestar_workflows.web.controllers.WorkflowDefinitionController[source]¶
Bases:
ControllerAPI controller for workflow definitions.
Provides endpoints for listing and retrieving workflow definitions, including their schemas and graph visualizations.
Tags: Workflow Definitions
- Parameters:
owner (
Router)
- path: str¶
A path fragment for the controller.
All route handlers under the controller will have the fragment appended to them. If not set it defaults to
/.
- tags: ClassVar[list[str]]¶
A sequence of string tags that will be appended to the schema of all route handlers under the controller.
- list_definitions(fn) = <litestar.handlers.http_handlers.decorators.get object>¶
- get_definition(fn) = <litestar.handlers.http_handlers.decorators.get object>¶
- get_definition_graph(fn) = <litestar.handlers.http_handlers.decorators.get object>¶
- after_request: AfterRequestHookHandler | None¶
A sync or async function executed before a
Requestis passed to any route handler.If this function returns a value, the request will not reach the route handler, and instead this value will be used.
- after_response: AfterResponseHookHandler | None¶
A sync or async function called after the response has been awaited.
It receives the
Requestinstance and should not return any values.
- before_request: BeforeRequestHookHandler | None¶
A sync or async function called immediately before calling the route handler.
It receives the
Requestinstance and any non-Nonereturn value is used for the response, bypassing the route handler.
- cache_control: CacheControlHeader | None¶
A
CacheControlHeaderheader to add to route handlers of this controller.Can be overridden by route handlers.
- dependencies: Dependencies | None¶
A string keyed dictionary of dependency
Providerinstances.
- dto: type[AbstractDTO] | None | EmptyType¶
AbstractDTOto use for (de)serializing and validation of request data.
- etag: ETag | None¶
An
etagheader of typeETagto add to route handlers of this controller.Can be overridden by route handlers.
- exception_handlers: ExceptionHandlersMap | None¶
A map of handler functions to status codes and/or exception types.
- guards: Sequence[Guard] | None¶
A sequence of
Guardcallables.
- include_in_schema: bool | EmptyType¶
A boolean flag dictating whether the route handler should be documented in the OpenAPI schema
- middleware: Sequence[Middleware] | None¶
A sequence of
Middleware.
- opt: Mapping[str, Any] | None¶
A string key mapping of arbitrary values that can be accessed in
Guardsor wherever you have access toRequestorASGI Scope.
- owner: Router¶
The
RouterorLitestarapp that owns the controller.This value is set internally by Litestar and it should not be set when subclassing the controller.
- parameters: ParametersMap | None¶
A mapping of
Parameterdefinitions available to all application paths.
- request_class: type[Request] | None¶
A custom subclass of
Requestto be used as the default request for all route handlers under the controller.
- request_max_body_size: int | None | EmptyType¶
Maximum allowed size of the request body in bytes. If this size is exceeded, a ‘413 - Request Entity Too Large’ error response is returned.
- response_class: type[Response] | None¶
A custom subclass of
Responseto be used as the default response for all route handlers under the controller.
- response_cookies: ResponseCookies | None¶
A list of
Cookieinstances.
- response_headers: ResponseHeaders | None¶
A string keyed dictionary mapping
ResponseHeaderinstances.
- return_dto: type[AbstractDTO] | None | EmptyType¶
AbstractDTOto use for serializing outbound response data.
- security: Sequence[SecurityRequirement] | None¶
A sequence of dictionaries that to the schema of all route handlers under the controller.
- signature_namespace: dict[str, Any]¶
A mapping of names to types for use in forward reference resolution during signature modelling.
- signature_types: Sequence[Any]¶
A sequence of types for use in forward reference resolution during signature modelling.
These types will be added to the signature namespace using their
__name__attribute.
- type_decoders: TypeDecodersSequence | None¶
A sequence of tuples, each composed of a predicate testing for type identity and a msgspec hook for deserialization.
- type_encoders: TypeEncodersMap | None¶
A mapping of types to callables that transform them into types supported for serialization.
- websocket_class: type[WebSocket] | None¶
A custom subclass of
WebSocketto be used as the default websocket for all route handlers under the controller.
- class litestar_workflows.web.controllers.WorkflowInstanceController[source]¶
Bases:
ControllerAPI controller for workflow instances.
Provides endpoints for starting, listing, and managing workflow instance executions.
Tags: Workflow Instances
- Parameters:
owner (
Router)
- path: str¶
A path fragment for the controller.
All route handlers under the controller will have the fragment appended to them. If not set it defaults to
/.
- tags: ClassVar[list[str]]¶
A sequence of string tags that will be appended to the schema of all route handlers under the controller.
- start_workflow(fn) = <litestar.handlers.http_handlers.decorators.post object>¶
- list_instances(fn) = <litestar.handlers.http_handlers.decorators.get object>¶
- get_instance(fn) = <litestar.handlers.http_handlers.decorators.get object>¶
- get_instance_graph(fn) = <litestar.handlers.http_handlers.decorators.get object>¶
- cancel_instance(fn) = <litestar.handlers.http_handlers.decorators.post object>¶
- retry_instance(fn) = <litestar.handlers.http_handlers.decorators.post object>¶
- after_request: AfterRequestHookHandler | None¶
A sync or async function executed before a
Requestis passed to any route handler.If this function returns a value, the request will not reach the route handler, and instead this value will be used.
- after_response: AfterResponseHookHandler | None¶
A sync or async function called after the response has been awaited.
It receives the
Requestinstance and should not return any values.
- before_request: BeforeRequestHookHandler | None¶
A sync or async function called immediately before calling the route handler.
It receives the
Requestinstance and any non-Nonereturn value is used for the response, bypassing the route handler.
- cache_control: CacheControlHeader | None¶
A
CacheControlHeaderheader to add to route handlers of this controller.Can be overridden by route handlers.
- dependencies: Dependencies | None¶
A string keyed dictionary of dependency
Providerinstances.
- dto: type[AbstractDTO] | None | EmptyType¶
AbstractDTOto use for (de)serializing and validation of request data.
- etag: ETag | None¶
An
etagheader of typeETagto add to route handlers of this controller.Can be overridden by route handlers.
- exception_handlers: ExceptionHandlersMap | None¶
A map of handler functions to status codes and/or exception types.
- guards: Sequence[Guard] | None¶
A sequence of
Guardcallables.
- include_in_schema: bool | EmptyType¶
A boolean flag dictating whether the route handler should be documented in the OpenAPI schema
- middleware: Sequence[Middleware] | None¶
A sequence of
Middleware.
- opt: Mapping[str, Any] | None¶
A string key mapping of arbitrary values that can be accessed in
Guardsor wherever you have access toRequestorASGI Scope.
- owner: Router¶
The
RouterorLitestarapp that owns the controller.This value is set internally by Litestar and it should not be set when subclassing the controller.
- parameters: ParametersMap | None¶
A mapping of
Parameterdefinitions available to all application paths.
- request_class: type[Request] | None¶
A custom subclass of
Requestto be used as the default request for all route handlers under the controller.
- request_max_body_size: int | None | EmptyType¶
Maximum allowed size of the request body in bytes. If this size is exceeded, a ‘413 - Request Entity Too Large’ error response is returned.
- response_class: type[Response] | None¶
A custom subclass of
Responseto be used as the default response for all route handlers under the controller.
- response_cookies: ResponseCookies | None¶
A list of
Cookieinstances.
- response_headers: ResponseHeaders | None¶
A string keyed dictionary mapping
ResponseHeaderinstances.
- return_dto: type[AbstractDTO] | None | EmptyType¶
AbstractDTOto use for serializing outbound response data.
- security: Sequence[SecurityRequirement] | None¶
A sequence of dictionaries that to the schema of all route handlers under the controller.
- signature_namespace: dict[str, Any]¶
A mapping of names to types for use in forward reference resolution during signature modelling.
- signature_types: Sequence[Any]¶
A sequence of types for use in forward reference resolution during signature modelling.
These types will be added to the signature namespace using their
__name__attribute.
- type_decoders: TypeDecodersSequence | None¶
A sequence of tuples, each composed of a predicate testing for type identity and a msgspec hook for deserialization.
- type_encoders: TypeEncodersMap | None¶
A mapping of types to callables that transform them into types supported for serialization.
- websocket_class: type[WebSocket] | None¶
A custom subclass of
WebSocketto be used as the default websocket for all route handlers under the controller.
Data Transfer Objects for the workflow web API.
This module defines DTOs for serializing and deserializing workflow data in REST API requests and responses.
- class litestar_workflows.web.dto.CompleteTaskDTO[source]¶
Bases:
objectDTO for completing a human task.
- output_data¶
Data submitted by the user completing the task.
- completed_by¶
User ID who completed the task.
- comment¶
Optional comment about the completion.
- class litestar_workflows.web.dto.GraphDTO[source]¶
Bases:
objectDTO for workflow graph visualization.
- mermaid_source¶
MermaidJS graph definition.
- nodes¶
List of node definitions.
- edges¶
List of edge definitions.
- class litestar_workflows.web.dto.HumanTaskDTO[source]¶
Bases:
objectDTO for human task summary.
- id¶
Task ID.
- instance_id¶
Workflow instance ID.
- step_name¶
Name of the human task step.
- title¶
Display title for the task.
- description¶
Detailed task description.
- assignee¶
User ID assigned to complete the task.
- status¶
Task status (pending, completed, canceled).
- due_date¶
Optional due date for task completion.
- created_at¶
When the task was created.
- form_schema¶
Optional JSON Schema for task form.
- Parameters:
- __init__(id, instance_id, step_name, title, description, assignee, status, due_date, created_at, form_schema=None)¶
- class litestar_workflows.web.dto.ReassignTaskDTO[source]¶
Bases:
objectDTO for reassigning a human task.
- new_assignee¶
User ID to assign the task to.
- reason¶
Optional reason for reassignment.
- class litestar_workflows.web.dto.StartWorkflowDTO[source]¶
Bases:
objectDTO for starting a new workflow instance.
- definition_name¶
Name of the workflow definition to instantiate.
- input_data¶
Initial data to pass to the workflow context.
- correlation_id¶
Optional correlation ID for tracking related workflows.
- user_id¶
Optional user ID who started the workflow.
- tenant_id¶
Optional tenant ID for multi-tenancy.
- Parameters:
- class litestar_workflows.web.dto.StepExecutionDTO[source]¶
Bases:
objectDTO for step execution record.
- id¶
Step execution ID.
- step_name¶
Name of the executed step.
- status¶
Execution status (PENDING, RUNNING, SUCCEEDED, etc.).
- started_at¶
When execution started.
- completed_at¶
When execution completed (if finished).
- error¶
Error message if execution failed.
- Parameters:
- class litestar_workflows.web.dto.WorkflowDefinitionDTO[source]¶
Bases:
objectDTO for workflow definition metadata.
- name¶
Workflow name.
- version¶
Workflow version.
- description¶
Human-readable description.
- steps¶
List of step names in the workflow.
- edges¶
List of edge definitions (source, target, condition).
- initial_step¶
Name of the starting step.
- terminal_steps¶
List of terminal step names.
- Parameters:
- class litestar_workflows.web.dto.WorkflowInstanceDTO[source]¶
Bases:
objectDTO for workflow instance summary.
- id¶
Instance ID.
- definition_name¶
Name of the workflow definition.
- status¶
Current execution status.
- current_step¶
Currently executing step (if applicable).
- started_at¶
When the workflow started.
- completed_at¶
When the workflow completed (if finished).
- created_by¶
User who started the workflow.
- Parameters:
- class litestar_workflows.web.dto.WorkflowInstanceDetailDTO[source]¶
Bases:
objectDTO for detailed workflow instance information.
Extends WorkflowInstanceDTO with full context and execution history.
- id¶
Instance ID.
- definition_name¶
Name of the workflow definition.
- status¶
Current execution status.
- current_step¶
Currently executing step (if applicable).
- started_at¶
When the workflow started.
- completed_at¶
When the workflow completed (if finished).
- created_by¶
User who started the workflow.
- context_data¶
Current workflow context data.
- metadata¶
Workflow metadata.
- step_history¶
List of step executions.
- error¶
Error message if workflow failed.
- Parameters:
-
step_history:
list[StepExecutionDTO]¶
- __init__(id, definition_name, status, current_step, started_at, completed_at, created_by, context_data, metadata, step_history, error=None)¶
Graph visualization utilities for workflows.
This module provides utilities for generating visual representations of workflow graphs, primarily using MermaidJS format.
- litestar_workflows.web.graph.generate_mermaid_graph(definition)[source]¶
Generate a MermaidJS graph representation of a workflow definition.
This function creates a Mermaid flowchart diagram that visualizes the workflow’s steps and transitions. Different step types are represented with different node shapes.
- Parameters:
definition (
WorkflowDefinition) – The workflow definition to visualize.- Return type:
- Returns:
A MermaidJS flowchart definition as a string.
Example
>>> mermaid = generate_mermaid_graph(definition) >>> print(mermaid) graph TD submit[Submit] review{{Review}} approve[Approve] submit --> review review --> approve
- litestar_workflows.web.graph.parse_graph_to_dict(definition)[source]¶
Parse a workflow definition into a dictionary representation.
This function extracts nodes and edges from a workflow definition into a structured dictionary format suitable for JSON serialization.
- Parameters:
definition (
WorkflowDefinition) – The workflow definition to parse.- Return type:
- Returns:
A dictionary containing nodes and edges lists.
Example
>>> graph_dict = parse_graph_to_dict(definition) >>> print(graph_dict["nodes"]) [{"id": "submit", "label": "Submit", "type": "machine"}, ...]
Web API¶
from litestar_workflows import WorkflowPlugin, WorkflowPluginConfig
# REST API is enabled by default
app = Litestar(
plugins=[
WorkflowPlugin(
config=WorkflowPluginConfig(
enable_api=True, # Default
api_path_prefix="/workflows",
)
),
],
)
# For advanced usage, DTOs and utilities are available:
from litestar_workflows.web import (
WorkflowWebConfig,
WorkflowDefinitionController,
WorkflowInstanceController,
HumanTaskController,
generate_mermaid_graph,
)
Contrib Modules¶
Optional integration modules for distributed execution (stub implementations for Phase 6).
Optional integrations for litestar-workflows.
This module contains optional execution engine implementations for distributed task queue systems. Each integration requires installing the corresponding extra:
celery: Celery distributed task queue (pip install litestar-workflows[celery])saq: Simple Async Queue for Redis (pip install litestar-workflows[saq])arq: Async Redis Queue (pip install litestar-workflows[arq])
These engines extend the base ExecutionEngine protocol to enable distributed
workflow execution across multiple workers.
Example
# When celery extra is installed
from litestar_workflows.contrib.celery import CeleryExecutionEngine
engine = CeleryExecutionEngine(celery_app=app, registry=registry)
await engine.start_workflow(MyWorkflow, initial_data={"key": "value"})
Note
These integrations are planned for Phase 6 (v0.7.0) and are currently stub implementations. See PLAN.md for the implementation roadmap.
Celery execution engine integration.
This module provides a Celery-based execution engine for distributed workflow execution. Celery is a distributed task queue that supports multiple message brokers (Redis, RabbitMQ, etc.).
- Installation:
pip install litestar-workflows[celery]
Note
This integration is planned for Phase 6 (v0.7.0) and is currently a stub. See PLAN.md for the implementation roadmap.
- Example (planned API):
from celery import Celery from litestar_workflows.contrib.celery import CeleryExecutionEngine celery_app = Celery("workflows", broker="redis://localhost:6379/0") engine = CeleryExecutionEngine( celery_app=celery_app, registry=workflow_registry, persistence=workflow_persistence, ) # Steps execute as Celery tasks instance = await engine.start_workflow(ApprovalWorkflow, initial_data={...})
- class litestar_workflows.contrib.celery.CeleryExecutionEngine[source]¶
Bases:
objectCelery-based distributed execution engine (stub).
This engine delegates step execution to Celery workers, enabling horizontal scaling of workflow execution across multiple machines.
Note
This is a stub implementation. The actual implementation is planned for Phase 6 (v0.7.0).
- Raises:
NotImplementedError – Always raised as this is a stub.
- Parameters:
- __init__(*args, **kwargs)[source]¶
Initialize CeleryExecutionEngine.
- Raises:
NotImplementedError – This is a stub implementation.
- Parameters:
SAQ (Simple Async Queue) execution engine integration.
This module provides a SAQ-based execution engine for distributed workflow execution. SAQ is an async-native job queue built on Redis with a simple API.
- Installation:
pip install litestar-workflows[saq]
Note
This integration is planned for Phase 6 (v0.7.0) and is currently a stub. See PLAN.md for the implementation roadmap.
- Example (planned API):
from saq import Queue from litestar_workflows.contrib.saq import SAQExecutionEngine queue = Queue.from_url("redis://localhost:6379/0") engine = SAQExecutionEngine( queue=queue, registry=workflow_registry, persistence=workflow_persistence, ) # Steps execute as SAQ jobs instance = await engine.start_workflow(ApprovalWorkflow, initial_data={...})
See also
SAQ documentation: https://github.com/tobymao/saq
- class litestar_workflows.contrib.saq.SAQExecutionEngine[source]¶
Bases:
objectSAQ-based distributed execution engine (stub).
This engine delegates step execution to SAQ workers, enabling async-native distributed workflow execution with Redis.
Note
This is a stub implementation. The actual implementation is planned for Phase 6 (v0.7.0).
- Raises:
NotImplementedError – Always raised as this is a stub.
- Parameters:
- __init__(*args, **kwargs)[source]¶
Initialize SAQExecutionEngine.
- Raises:
NotImplementedError – This is a stub implementation.
- Parameters:
ARQ (Async Redis Queue) execution engine integration.
This module provides an ARQ-based execution engine for distributed workflow execution. ARQ is a fast async job queue built on Redis with type hints.
- Installation:
pip install litestar-workflows[arq]
Note
This integration is planned for Phase 6 (v0.7.0) and is currently a stub. See PLAN.md for the implementation roadmap.
- Example (planned API):
from arq import create_pool from arq.connections import RedisSettings from litestar_workflows.contrib.arq import ARQExecutionEngine redis = await create_pool(RedisSettings()) engine = ARQExecutionEngine( redis_pool=redis, registry=workflow_registry, persistence=workflow_persistence, ) # Steps execute as ARQ jobs instance = await engine.start_workflow(ApprovalWorkflow, initial_data={...})
See also
ARQ documentation: https://arq-docs.helpmanual.io/
- class litestar_workflows.contrib.arq.ARQExecutionEngine[source]¶
Bases:
objectARQ-based distributed execution engine (stub).
This engine delegates step execution to ARQ workers, enabling async-native distributed workflow execution with Redis and full type hint support.
Note
This is a stub implementation. The actual implementation is planned for Phase 6 (v0.7.0).
- Raises:
NotImplementedError – Always raised as this is a stub.
- Parameters:
- __init__(*args, **kwargs)[source]¶
Initialize ARQExecutionEngine.
- Raises:
NotImplementedError – This is a stub implementation.
- Parameters:
Note
The contrib engines are stub implementations. Full implementations are planned for Phase 6 (v0.7.0).
Contrib API¶
# Celery execution engine
from litestar_workflows.contrib.celery import CeleryExecutionEngine
# SAQ execution engine
from litestar_workflows.contrib.saq import SAQExecutionEngine
# ARQ execution engine
from litestar_workflows.contrib.arq import ARQExecutionEngine