Module Reference

Auto-generated API documentation from source code.

Note

This reference is automatically generated from docstrings in the source code. For conceptual documentation, see Core Concepts.

litestar_workflows

Litestar Workflows - Workflow automation library for Litestar.

This package provides a comprehensive workflow orchestration framework for Litestar applications, supporting both automated and human-in-the-loop workflows.

Key Features:
  • DAG-based workflow definitions with validation

  • Sequential, parallel, and conditional execution

  • Human task integration

  • Event-driven architecture

  • Flexible execution engines (local, distributed)

  • Type-safe workflow definitions

Example

Basic plugin usage:

from litestar import Litestar
from litestar_workflows import WorkflowPlugin

app = Litestar(plugins=[WorkflowPlugin()])

Defining a workflow:

from litestar_workflows import (
    WorkflowDefinition,
    BaseMachineStep,
    Edge,
    WorkflowContext,
)


class ProcessOrder(BaseMachineStep):
    name = "process_order"
    description = "Process the customer order"

    async def execute(self, context: WorkflowContext) -> dict:
        order_id = context.get("order_id")
        # Process order logic here
        return {"processed": True, "order_id": order_id}


workflow = WorkflowDefinition(
    name="order_workflow",
    version="1.0.0",
    steps={"process": ProcessOrder()},
    edges=[],
    initial_step="process",
    terminal_steps={"process"},
)
class litestar_workflows.BaseHumanStep[source]

Bases: BaseStep

Base for human approval/interaction steps.

Human steps pause workflow execution and wait for user input. They support forms, assignments, and deadline tracking.

Parameters:
__init__(name, title, description='', form_schema=None, assignee_key=None)[source]

Initialize the human step.

Parameters:
  • name (str) – Unique identifier for the step.

  • title (str) – Display title for the human task.

  • description (str) – Human-readable description.

  • form_schema (dict[str, Any] | None) – Optional JSON Schema for the task form.

  • assignee_key (str | None) – Optional context key to get assignee dynamically.

assignee_key: str | None = None

Context key for dynamic assignment of tasks.

async execute(context)[source]

Execute the human step.

For human steps, execution typically means waiting for user input. Override this if you need custom behavior.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

Any

Returns:

The form data submitted by the user.

form_schema: dict[str, Any] | None = None

JSON Schema defining the form structure for user input.

async get_assignee(context)[source]

Get the assignee for this task from context.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

str | None

Returns:

User ID to assign the task to, or None for unassigned.

step_type: StepType = 'human'

Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).

title: str

Display title for the human task.

class litestar_workflows.BaseMachineStep[source]

Bases: BaseStep

Base for automated machine steps.

Machine steps execute automatically without requiring human interaction. They are the building blocks for automated workflow processes.

Parameters:
  • name (str)

  • description (str)

__init__(name, description='')[source]

Initialize the machine step.

Parameters:
  • name (str) – Unique identifier for the step.

  • description (str) – Human-readable description.

step_type: StepType = 'machine'

Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).

class litestar_workflows.ConditionalGroup[source]

Bases: StepGroup

Execute one of multiple branches based on condition.

This implements the Gateway pattern where a condition function determines which branch to execute. Similar to if/else or switch statements.

Example

>>> def check_status(ctx: WorkflowContext) -> str:
...     return "approved" if ctx.get("approved") else "rejected"
>>> group = ConditionalGroup(
...     condition=check_status,
...     branches={
...         "approved": approve_step,
...         "rejected": reject_step,
...     },
... )
>>> result = await group.execute(context, engine)
Parameters:
__init__(condition, branches)[source]

Initialize a conditional group.

Parameters:
async execute(context, engine)[source]

Execute the branch selected by the condition.

Parameters:
Return type:

Any

Returns:

The result of the selected branch, or None if no match.

Raises:

Exception – Any exception from step execution.

class litestar_workflows.Edge[source]

Bases: object

Defines a transition between workflow steps.

An edge represents a directed connection from one step to another, optionally conditioned on a predicate function or expression.

source

Name of the source step or the Step class itself.

target

Name of the target step or the Step class itself.

condition

Optional condition for edge traversal. Can be a callable that takes WorkflowContext and returns bool, or a string expression.

Example

>>> edge = Edge(
...     source="submit",
...     target="review",
...     condition=lambda ctx: ctx.get("auto_approve") is False,
... )
>>> conditional_edge = Edge(
...     source="review", target="approve", condition="context.get('approved') == True"
... )
Parameters:
__init__(source, target, condition=None)
Parameters:
condition: str | Callable[[WorkflowContext], bool] | None = None
evaluate_condition(context)[source]

Evaluate the edge condition against the workflow context.

Parameters:

context (WorkflowContext) – The current workflow execution context.

Return type:

bool

Returns:

True if the condition is met or if no condition exists, False otherwise.

Example

>>> edge = Edge(source="a", target="b", condition=lambda ctx: ctx.get("value") > 10)
>>> context.set("value", 15)
>>> edge.evaluate_condition(context)
True
get_source_name()[source]

Get the name of the source step.

Return type:

str

Returns:

The source step name as a string.

get_target_name()[source]

Get the name of the target step.

Return type:

str

Returns:

The target step name as a string.

source: str | type[Step]
target: str | type[Step]
class litestar_workflows.ExclusiveGateway[source]

Bases: BaseStep

XOR gateway - exactly one path based on condition.

This gateway evaluates a condition function and returns the name of the next step to execute. Only one path will be followed.

Example

>>> def check_approval(ctx: WorkflowContext) -> str:
...     return "approved_step" if ctx.get("approved") else "rejected_step"
>>> gateway = ExclusiveGateway("approval_gate", condition=check_approval)
>>> next_step = await gateway.execute(context)  # Returns step name
Parameters:
__init__(name, condition, description='')[source]

Initialize an exclusive gateway.

Parameters:
  • name (str) – Unique identifier for the gateway.

  • condition (Callable[[WorkflowContext], str]) – Function that evaluates context and returns next step name.

  • description (str) – Human-readable description.

async execute(context)[source]

Evaluate condition and return the name of the next step.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

str

Returns:

The name of the next step to execute.

Raises:

Exception – If condition evaluation fails.

step_type: StepType = 'gateway'

Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).

exception litestar_workflows.HumanTaskError[source]

Bases: WorkflowsError

Base exception for human task related errors.

All human task specific exceptions should inherit from this class. This allows distinguishing between automated workflow errors and human interaction errors.

exception litestar_workflows.InvalidTransitionError[source]

Bases: WorkflowsError

Raised when an invalid state transition is attempted.

This occurs when trying to transition between steps in a way that violates the workflow’s defined graph structure or transition rules.

from_step

The step being transitioned from.

to_step

The step being transitioned to.

Parameters:
__init__(from_step, to_step, reason=None)[source]

Initialize the exception with transition details.

Parameters:
  • from_step (str) – The step being transitioned from.

  • to_step (str) – The step being transitioned to.

  • reason (str | None) – Additional context about why the transition is invalid.

class litestar_workflows.LocalExecutionEngine[source]

Bases: object

In-memory async execution engine for workflows.

This engine executes workflows in the same process using asyncio tasks. It’s suitable for development, testing, and single-instance production deployments where distributed execution is not required.

registry

The workflow registry for looking up definitions.

persistence

Optional persistence layer for saving state.

event_bus

Optional event bus for emitting workflow events.

_instances

In-memory storage of workflow instances.

_running

Map of instance IDs to their running asyncio tasks.

Parameters:
__init__(registry, persistence=None, event_bus=None)[source]

Initialize the local execution engine.

Parameters:
  • registry (WorkflowRegistry) – The workflow registry.

  • persistence (Any | None) – Optional persistence layer implementing save/load methods.

  • event_bus (Any | None) – Optional event bus implementing emit method.

async cancel_workflow(instance_id, reason)[source]

Cancel a running workflow.

Parameters:
  • instance_id (UUID) – The workflow instance ID.

  • reason (str) – Reason for cancellation.

Return type:

None

async complete_human_task(instance_id, step_name, user_id, data)[source]

Complete a human task with user-provided data.

Parameters:
  • instance_id (UUID) – The workflow instance ID.

  • step_name (str) – Name of the human task step.

  • user_id (str) – ID of the user completing the task.

  • data (dict[str, Any]) – User-provided data to merge into context.

Return type:

None

async execute_step(step, context, previous_result=None)[source]

Execute a single step with the given context.

Parameters:
  • step (Step[Any]) – The step to execute.

  • context (WorkflowContext) – The workflow context.

  • previous_result (Any) – Optional result from previous step.

Return type:

Any

Returns:

The result of the step execution.

get_all_instances()[source]

Get all workflow instances (running and completed).

Return type:

list[WorkflowInstanceData]

Returns:

List of all WorkflowInstanceData objects.

async get_instance(instance_id)[source]

Retrieve a workflow instance by ID.

Parameters:

instance_id (UUID) – The workflow instance ID.

Return type:

WorkflowInstanceData

Returns:

The WorkflowInstanceData.

Raises:

KeyError – If the instance is not found.

get_running_instances()[source]

Get all currently running workflow instances.

Return type:

list[WorkflowInstanceData]

Returns:

List of running WorkflowInstanceData objects.

async schedule_step(instance_id, step_name, delay=None)[source]

Schedule a step for execution.

Parameters:
  • instance_id (UUID) – The workflow instance ID.

  • step_name (str) – Name of the step to schedule.

  • delay (timedelta | None) – Optional delay before execution.

Return type:

None

async start_workflow(workflow, initial_data=None)[source]

Start a new workflow instance.

Creates a new workflow instance and begins execution from the initial step.

Parameters:
  • workflow (type[Workflow]) – The workflow class to execute.

  • initial_data (dict[str, Any] | None) – Optional initial data for the workflow context.

Return type:

WorkflowInstanceData

Returns:

The created WorkflowInstanceData.

Example

>>> engine = LocalExecutionEngine(registry)
>>> instance = await engine.start_workflow(
...     ApprovalWorkflow, initial_data={"document_id": "doc_123"}
... )
class litestar_workflows.ParallelGateway[source]

Bases: BaseStep

AND gateway - all paths execute in parallel.

This gateway splits execution into multiple parallel branches. All branches will be executed concurrently.

Example

>>> gateway = ParallelGateway(
...     "fork_point", branches=["notify_team", "update_db", "send_email"]
... )
>>> branch_names = await gateway.execute(context)
Parameters:
__init__(name, branches, description='')[source]

Initialize a parallel gateway.

Parameters:
  • name (str) – Unique identifier for the gateway.

  • branches (list[str]) – List of step names to execute in parallel.

  • description (str) – Human-readable description.

async execute(context)[source]

Return the list of branches to execute in parallel.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

list[str]

Returns:

List of step names to execute concurrently.

step_type: StepType = 'gateway'

Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).

class litestar_workflows.ParallelGroup[source]

Bases: StepGroup

Execute steps in parallel.

This implements the Group pattern where multiple steps execute concurrently. Optionally supports a callback step (Chord pattern) that receives all results.

Example

>>> # Simple parallel execution
>>> group = ParallelGroup(step1, step2, step3)
>>> results = await group.execute(context, engine)  # [result1, result2, result3]
>>> # Chord pattern with callback
>>> group = ParallelGroup(step1, step2, step3, callback=aggregate_step)
>>> result = await group.execute(context, engine)  # aggregate_step([r1, r2, r3])
Parameters:
__init__(*steps, callback=None)[source]

Initialize a parallel group.

Parameters:
  • *steps (Union[Step[Any], StepGroup]) – Steps or groups to execute in parallel.

  • callback (Optional[Step[Any]]) – Optional callback step to process results (Chord pattern).

async execute(context, engine)[source]

Execute steps in parallel using asyncio.gather.

Parameters:
Return type:

list[Any] | Any

Returns:

List of results if no callback, otherwise callback result.

Raises:

Exception – Any exception from step execution.

class litestar_workflows.SequentialGroup[source]

Bases: StepGroup

Execute steps in sequence, passing results.

This implements the Chain pattern where each step receives the result of the previous step as input. The final step’s result is returned.

Example

>>> group = SequentialGroup(step1, step2, step3)
>>> result = await group.execute(context, engine)
# step1 -> step2(result1) -> step3(result2) -> result3
Parameters:

steps (Union[Step[Any], StepGroup])

__init__(*steps)[source]

Initialize a sequential group.

Parameters:

*steps (Union[Step[Any], StepGroup]) – Steps or groups to execute in sequence.

async execute(context, engine)[source]

Execute steps sequentially, passing results forward.

Parameters:
Return type:

Any

Returns:

The result of the final step.

Raises:

Exception – Any exception from step execution.

class litestar_workflows.StepExecution[source]

Bases: object

Record of a single step execution within a workflow.

step_name

Name of the executed step.

status

Final status of the step execution.

started_at

Timestamp when step execution began.

completed_at

Timestamp when step execution finished (if completed).

result

Return value from successful execution.

error

Error message if execution failed.

input_data

Input data passed to the step.

output_data

Output data produced by the step.

Parameters:
__init__(step_name, status, started_at, completed_at=None, result=None, error=None, input_data=None, output_data=None)
Parameters:
completed_at: datetime | None = None
error: str | None = None
input_data: dict[str, Any] | None = None
output_data: dict[str, Any] | None = None
result: Any = None
step_name: str
status: str
started_at: datetime
exception litestar_workflows.StepExecutionError[source]

Bases: WorkflowsError

Raised when a step fails to execute.

This wraps the underlying exception that caused the step to fail, providing context about which step failed.

step_name

The name of the step that failed.

cause

The underlying exception that caused the failure, if any.

Parameters:
__init__(step_name, cause=None)[source]

Initialize the exception with step execution details.

Parameters:
  • step_name (str) – The name of the step that failed.

  • cause (Exception | None) – The underlying exception that caused the failure, if any.

class litestar_workflows.StepStatus[source]

Bases: StrEnum

Execution status of a workflow step.

PENDING

Step has not yet been scheduled for execution.

SCHEDULED

Step is queued and awaiting execution.

RUNNING

Step is currently executing.

WAITING

Step is paused, waiting for external input or event.

SUCCEEDED

Step completed successfully.

FAILED

Step execution encountered an error.

CANCELED

Step was manually canceled before completion.

SKIPPED

Step was skipped due to conditional logic.

__new__(value)
PENDING = 'pending'
SCHEDULED = 'scheduled'
RUNNING = 'running'
WAITING = 'waiting'
SUCCEEDED = 'succeeded'
FAILED = 'failed'
CANCELED = 'canceled'
SKIPPED = 'skipped'
class litestar_workflows.StepType[source]

Bases: StrEnum

Classification of step types within a workflow.

MACHINE

Automated execution without human intervention.

HUMAN

Requires user interaction and input.

WEBHOOK

Waits for external callback or event.

TIMER

Waits for a time-based condition to be met.

GATEWAY

Decision or branching point in the workflow.

__new__(value)
MACHINE = 'machine'
HUMAN = 'human'
WEBHOOK = 'webhook'
TIMER = 'timer'
GATEWAY = 'gateway'
exception litestar_workflows.TaskAlreadyCompletedError[source]

Bases: HumanTaskError

Raised when trying to complete an already completed task.

This prevents double-completion of human tasks which could lead to inconsistent workflow state.

task_id

The ID of the task that was already completed.

Parameters:

task_id (str | UUID)

__init__(task_id)[source]

Initialize the exception with task details.

Parameters:

task_id (str | UUID) – The ID of the task that was already completed.

exception litestar_workflows.TaskNotFoundError[source]

Bases: HumanTaskError

Raised when a human task is not found.

This occurs when trying to retrieve or complete a task that doesn’t exist in the human task storage.

task_id

The ID of the task that was not found.

Parameters:

task_id (str | UUID)

__init__(task_id)[source]

Initialize the exception with task details.

Parameters:

task_id (str | UUID) – The ID of the task that was not found.

class litestar_workflows.TimerStep[source]

Bases: BaseStep

Step that waits for a duration before continuing.

Timer steps introduce delays in workflow execution. The duration can be static or dynamically calculated based on the workflow context.

Example

>>> # Static delay
>>> step = TimerStep("wait_5min", duration=timedelta(minutes=5))
>>> await step.execute(context)
>>> # Dynamic delay based on context
>>> def get_delay(ctx: WorkflowContext) -> timedelta:
...     priority = ctx.get("priority", "normal")
...     return timedelta(hours=1) if priority == "low" else timedelta(minutes=5)
>>> step = TimerStep("dynamic_wait", duration=get_delay)
>>> await step.execute(context)
Parameters:
__init__(name, duration, description='')[source]

Initialize a timer step.

Parameters:
async execute(context)[source]

Wait for the specified duration.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

None

Returns:

None after the delay completes.

get_duration(context)[source]

Get the delay duration for this step.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

timedelta

Returns:

The duration to wait.

step_type: StepType = 'timer'

Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).

exception litestar_workflows.UnauthorizedTaskError[source]

Bases: HumanTaskError

Raised when user is not authorized to complete a task.

This enforces authorization rules for human tasks, ensuring only assigned users or users with appropriate roles can complete tasks.

task_id

The ID of the task.

user_id

The ID of the user attempting to complete the task.

Parameters:
__init__(task_id, user_id)[source]

Initialize the exception with authorization details.

Parameters:
  • task_id (str | UUID) – The ID of the task.

  • user_id (str) – The ID of the user attempting to complete the task.

exception litestar_workflows.WorkflowAlreadyCompletedError[source]

Bases: WorkflowsError

Raised when trying to modify a completed workflow.

This prevents operations on workflow instances that have reached a terminal state (completed, failed, or cancelled).

instance_id

The ID of the workflow instance.

status

The current terminal status of the workflow.

Parameters:
__init__(instance_id, status)[source]

Initialize the exception with workflow state details.

Parameters:
  • instance_id (str | UUID) – The ID of the workflow instance.

  • status (str) – The current terminal status of the workflow.

class litestar_workflows.WorkflowContext[source]

Bases: object

Execution context passed between workflow steps.

The WorkflowContext carries all state, metadata, and execution history throughout a workflow’s lifecycle. It provides a mutable data dictionary for step communication and immutable metadata for audit and tracking purposes.

workflow_id

Unique identifier for the workflow definition.

instance_id

Unique identifier for this workflow instance.

data

Mutable state dictionary for inter-step communication.

metadata

Immutable metadata about the workflow execution.

current_step

Name of the currently executing step.

step_history

Chronological record of all step executions.

started_at

Timestamp when the workflow instance was created.

user_id

Optional user identifier for human task contexts.

tenant_id

Optional tenant identifier for multi-tenancy support.

Example

>>> from uuid import uuid4
>>> from datetime import datetime
>>> context = WorkflowContext(
...     workflow_id=uuid4(),
...     instance_id=uuid4(),
...     data={"input": "value"},
...     metadata={"creator": "user123"},
...     current_step="initial_step",
...     step_history=[],
...     started_at=datetime.utcnow(),
... )
>>> context.set("result", 42)
>>> context.get("result")
42
Parameters:
__init__(workflow_id, instance_id, data, metadata, current_step, step_history, started_at, user_id=None, tenant_id=None)
Parameters:
get(key, default=None)[source]

Retrieve a value from the workflow data dictionary.

Parameters:
  • key (str) – The key to look up in the data dictionary.

  • default (Any) – Default value to return if key is not found.

Return type:

Any

Returns:

The value associated with the key, or the default if not present.

Example

>>> context.get("some_key", "default_value")
'default_value'
get_last_execution(step_name=None)[source]

Get the most recent execution record for a step.

Parameters:

step_name (str | None) – Optional step name to filter by. If None, returns the last execution regardless of step.

Return type:

StepExecution | None

Returns:

The most recent StepExecution matching the criteria, or None if not found.

Example

>>> last_exec = context.get_last_execution("approval_step")
>>> if last_exec and last_exec.status == "SUCCEEDED":
...     print("Step succeeded")
has_step_executed(step_name)[source]

Check if a step has been executed in this workflow instance.

Parameters:

step_name (str) – Name of the step to check.

Return type:

bool

Returns:

True if the step appears in the execution history, False otherwise.

Example

>>> if context.has_step_executed("validation"):
...     print("Validation already completed")
set(key, value)[source]

Set a value in the workflow data dictionary.

Parameters:
  • key (str) – The key to set in the data dictionary.

  • value (Any) – The value to associate with the key.

Return type:

None

Example

>>> context.set("status", "approved")
>>> context.get("status")
'approved'
tenant_id: str | None = None
user_id: str | None = None
with_step(step_name)[source]

Create a new context for the given step execution.

This method returns a shallow copy of the context with the current_step updated to the provided step name. The data and metadata dictionaries are shared with the original context.

Parameters:

step_name (str) – Name of the step to execute next.

Return type:

WorkflowContext

Returns:

A new WorkflowContext instance with updated current_step.

Example

>>> new_context = context.with_step("next_step")
>>> new_context.current_step
'next_step'
workflow_id: UUID
instance_id: UUID
data: dict[str, Any]
metadata: dict[str, Any]
current_step: str
step_history: list[StepExecution]
started_at: datetime
class litestar_workflows.WorkflowDefinition[source]

Bases: object

Declarative workflow structure.

The WorkflowDefinition captures the complete structure of a workflow including all steps, edges, and metadata. It serves as the blueprint for workflow execution.

name

Unique identifier for the workflow.

version

Version string for workflow versioning.

description

Human-readable description of the workflow’s purpose.

steps

Dictionary mapping step names to Step instances.

edges

List of Edge instances defining the workflow graph.

initial_step

Name of the step to execute first.

terminal_steps

Set of step names that mark workflow completion.

Example

>>> from litestar_workflows.core.types import StepType
>>> definition = WorkflowDefinition(
...     name="approval_flow",
...     version="1.0.0",
...     description="Document approval workflow",
...     steps={
...         "submit": SubmitStep(),
...         "review": ReviewStep(),
...         "approve": ApproveStep(),
...     },
...     edges=[
...         Edge(source="submit", target="review"),
...         Edge(source="review", target="approve"),
...     ],
...     initial_step="submit",
...     terminal_steps={"approve"},
... )
Parameters:
__init__(name, version, description, steps, edges, initial_step, terminal_steps=<factory>)
Parameters:
get_next_steps(current_step, context)[source]

Get the list of next steps from the current step based on edge conditions.

Parameters:
  • current_step (str) – Name of the current step.

  • context (WorkflowContext) – The workflow execution context for condition evaluation.

Return type:

list[str]

Returns:

List of step names that should be executed next.

Example

>>> next_steps = definition.get_next_steps("review", context)
>>> if "approve" in next_steps:
...     print("Moving to approval")
to_mermaid()[source]

Generate a MermaidJS graph representation of the workflow.

Return type:

str

Returns:

MermaidJS graph definition as a string.

Example

>>> mermaid = definition.to_mermaid()
>>> print(mermaid)
graph TD
    submit[Submit]
    review{Review}
    approve[Approve]
    submit --> review
    review --> approve
to_mermaid_with_state(current_step=None, completed_steps=None, failed_steps=None)[source]

Generate a MermaidJS graph with execution state highlighting.

Parameters:
  • current_step (str | None) – Name of the currently executing step.

  • completed_steps (list[str] | None) – List of successfully completed step names.

  • failed_steps (list[str] | None) – List of failed step names.

Return type:

str

Returns:

MermaidJS graph definition with state styling.

Example

>>> mermaid = definition.to_mermaid_with_state(
...     current_step="review", completed_steps=["submit"], failed_steps=[]
... )
validate()[source]

Validate the workflow definition for common issues.

Return type:

list[str]

Returns:

List of validation error messages. Empty list if valid.

Example

>>> errors = definition.validate()
>>> if errors:
...     print("Validation errors:", errors)
name: str
version: str
description: str
steps: dict[str, Step[Any]]
edges: list[Edge]
initial_step: str
terminal_steps: set[str]
exception litestar_workflows.WorkflowInstanceNotFoundError[source]

Bases: WorkflowsError

Raised when a workflow instance is not found.

This occurs when trying to retrieve or operate on a workflow instance that doesn’t exist in the workflow engine’s storage.

instance_id

The ID of the workflow instance that was not found.

Parameters:

instance_id (str | UUID)

__init__(instance_id)[source]

Initialize the exception with instance details.

Parameters:

instance_id (str | UUID) – The ID of the workflow instance that was not found.

exception litestar_workflows.WorkflowNotFoundError[source]

Bases: WorkflowsError

Raised when a workflow definition is not found.

This typically occurs when trying to retrieve or instantiate a workflow that hasn’t been registered with the workflow registry.

name

The name of the workflow that was not found.

version

The specific version requested, if any.

Parameters:
__init__(name, version=None)[source]

Initialize the exception with workflow details.

Parameters:
  • name (str) – The name of the workflow that was not found.

  • version (str | None) – The specific version requested, if any.

class litestar_workflows.WorkflowPlugin[source]

Bases: InitPluginProtocol

Litestar plugin for workflow management.

This plugin integrates litestar-workflows with a Litestar application, providing dependency injection for the WorkflowRegistry and ExecutionEngine.

Example

Basic usage with auto-registration:

from litestar import Litestar
from litestar_workflows import WorkflowPlugin, WorkflowPluginConfig


class MyWorkflow:
    __workflow_name__ = "my_workflow"
    __workflow_version__ = "1.0.0"
    # ... workflow definition


app = Litestar(
    plugins=[
        WorkflowPlugin(
            config=WorkflowPluginConfig(auto_register_workflows=[MyWorkflow])
        )
    ]
)

Using in a route handler:

from litestar import get
from litestar_workflows import WorkflowRegistry, LocalExecutionEngine


@get("/workflows")
async def list_workflows(
    workflow_registry: WorkflowRegistry,
) -> list[dict]:
    definitions = workflow_registry.list_definitions()
    return [{"name": d.name, "version": d.version} for d in definitions]


@post("/workflows/{name}/start")
async def start_workflow(
    name: str,
    workflow_engine: LocalExecutionEngine,
    workflow_registry: WorkflowRegistry,
) -> dict:
    workflow_class = workflow_registry.get_workflow_class(name)
    instance = await workflow_engine.start_workflow(workflow_class)
    return {"instance_id": str(instance.id), "status": instance.status}
Parameters:

config (WorkflowPluginConfig | None)

__init__(config=None)[source]

Initialize the plugin.

Parameters:

config (WorkflowPluginConfig | None) – Optional configuration for the plugin.

property engine: LocalExecutionEngine

Get the execution engine.

Returns:

The ExecutionEngine instance.

Raises:

RuntimeError – If accessed before plugin initialization.

on_app_init(app_config)[source]

Initialize the plugin when the Litestar app starts.

This method: 1. Creates or uses the provided WorkflowRegistry 2. Creates or uses the provided ExecutionEngine 3. Registers any auto_register_workflows 4. Adds dependency providers to the app config 5. Optionally registers REST API controllers if enable_api=True

Parameters:

app_config (AppConfig) – The Litestar application configuration.

Return type:

AppConfig

Returns:

The modified application configuration.

property registry: WorkflowRegistry

Get the workflow registry.

Returns:

The WorkflowRegistry instance.

Raises:

RuntimeError – If accessed before plugin initialization.

class litestar_workflows.WorkflowPluginConfig[source]

Bases: object

Configuration for the WorkflowPlugin.

registry

Optional pre-configured WorkflowRegistry. If not provided, a new one will be created.

engine

Optional pre-configured ExecutionEngine. If not provided, a LocalExecutionEngine will be created using the registry.

auto_register_workflows

List of workflow classes to automatically register with the registry on app startup.

dependency_key_registry

The key used for dependency injection of the WorkflowRegistry. Defaults to “workflow_registry”.

dependency_key_engine

The key used for dependency injection of the ExecutionEngine. Defaults to “workflow_engine”.

enable_api

Whether to enable the REST API endpoints. Defaults to True.

api_path_prefix

URL path prefix for all workflow API endpoints. Defaults to “/workflows”.

api_guards

List of Litestar guards to apply to all workflow API endpoints.

api_tags

OpenAPI tags to apply to workflow API endpoints.

include_api_in_schema

Whether to include API endpoints in OpenAPI schema. Defaults to True.

Parameters:
__init__(registry=None, engine=None, auto_register_workflows=<factory>, dependency_key_registry='workflow_registry', dependency_key_engine='workflow_engine', enable_api=True, api_path_prefix='/workflows', api_guards=<factory>, api_tags=<factory>, include_api_in_schema=True)
Parameters:
api_path_prefix: str = '/workflows'
dependency_key_engine: str = 'workflow_engine'
dependency_key_registry: str = 'workflow_registry'
enable_api: bool = True
engine: LocalExecutionEngine | None = None
include_api_in_schema: bool = True
registry: WorkflowRegistry | None = None
auto_register_workflows: list[type[Any]]
api_guards: list[Any]
api_tags: list[str]
class litestar_workflows.WorkflowRegistry[source]

Bases: object

Registry for storing and retrieving workflow definitions.

The registry maintains a mapping of workflow names to versions and their definitions, enabling workflow lookup and version management.

_definitions

Nested dict mapping name -> version -> WorkflowDefinition.

_workflow_classes

Map of workflow names to their class definitions.

__init__()[source]

Initialize an empty workflow registry.

get_definition(name, version=None)[source]

Retrieve a workflow definition by name and optional version.

Parameters:
  • name (str) – The workflow name.

  • version (str | None) – The workflow version. If None, returns the latest version.

Return type:

WorkflowDefinition

Returns:

The WorkflowDefinition for the requested workflow.

Raises:

KeyError – If the workflow name or version is not found.

Example

>>> definition = registry.get_definition("approval_workflow")
>>> definition_v1 = registry.get_definition("approval_workflow", "1.0.0")
get_versions(name)[source]

Get all versions for a workflow.

Parameters:

name (str) – The workflow name.

Return type:

list[str]

Returns:

List of version strings.

Raises:

KeyError – If the workflow name is not found.

Example

>>> versions = registry.get_versions("approval_workflow")
>>> print(versions)  # ['1.0.0', '1.1.0', '2.0.0']
get_workflow_class(name)[source]

Retrieve the workflow class by name.

Parameters:

name (str) – The workflow name.

Return type:

type[Workflow]

Returns:

The Workflow class.

Raises:

KeyError – If the workflow name is not found.

Example

>>> WorkflowClass = registry.get_workflow_class("approval_workflow")
>>> instance = await engine.start_workflow(WorkflowClass)
has_workflow(name, version=None)[source]

Check if a workflow exists in the registry.

Parameters:
  • name (str) – The workflow name.

  • version (str | None) – Optional specific version to check.

Return type:

bool

Returns:

True if the workflow exists, False otherwise.

Example

>>> if registry.has_workflow("approval_workflow"):
...     definition = registry.get_definition("approval_workflow")
list_definitions(active_only=True)[source]

List all registered workflow definitions.

Parameters:

active_only (bool) – If True, only return the latest version of each workflow. If False, return all versions.

Return type:

list[WorkflowDefinition]

Returns:

List of WorkflowDefinition objects.

Example

>>> all_workflows = registry.list_definitions()
>>> all_versions = registry.list_definitions(active_only=False)
register(workflow_class)[source]

Register a workflow class with the registry.

Extracts the workflow definition from the class and stores it indexed by name and version.

Parameters:

workflow_class (type[Workflow]) – The workflow class to register.

Return type:

None

Example

>>> registry = WorkflowRegistry()
>>> registry.register(MyWorkflow)
unregister(name, version=None)[source]

Remove a workflow from the registry.

Parameters:
  • name (str) – The workflow name.

  • version (str | None) – The specific version to remove. If None, removes all versions.

Return type:

None

Example

>>> registry.unregister("old_workflow")
>>> registry.unregister("approval_workflow", "1.0.0")
class litestar_workflows.WorkflowStatus[source]

Bases: StrEnum

Overall status of a workflow instance.

PENDING

Workflow has been created but not yet started.

RUNNING

Workflow is actively executing steps.

PAUSED

Workflow execution is temporarily paused.

WAITING

Workflow is waiting for human input or external event.

COMPLETED

Workflow finished successfully.

FAILED

Workflow terminated due to an error.

CANCELED

Workflow was manually canceled.

__new__(value)
PENDING = 'pending'
RUNNING = 'running'
PAUSED = 'paused'
WAITING = 'waiting'
COMPLETED = 'completed'
FAILED = 'failed'
CANCELED = 'canceled'
exception litestar_workflows.WorkflowValidationError[source]

Bases: WorkflowsError

Raised when workflow definition validation fails.

This occurs during workflow registration or modification when the workflow definition doesn’t meet required constraints (e.g., cycles, unreachable nodes, invalid step types).

errors

List of validation error messages.

Parameters:

errors (list[str])

__init__(errors)[source]

Initialize the exception with validation errors.

Parameters:

errors (list[str]) – List of validation error messages.

exception litestar_workflows.WorkflowsError[source]

Bases: Exception

Base exception for all litestar-workflows errors.

All exceptions raised by litestar-workflows should inherit from this class. This allows users to catch all workflow-related errors with a single except clause.

Core Module

The core module provides fundamental building blocks for workflow automation.

Core domain module for litestar-workflows.

This module exports the fundamental building blocks for workflow definitions, including types, protocols, context, definitions, and events.

class litestar_workflows.core.Edge[source]

Bases: object

Defines a transition between workflow steps.

An edge represents a directed connection from one step to another, optionally conditioned on a predicate function or expression.

source

Name of the source step or the Step class itself.

target

Name of the target step or the Step class itself.

condition

Optional condition for edge traversal. Can be a callable that takes WorkflowContext and returns bool, or a string expression.

Example

>>> edge = Edge(
...     source="submit",
...     target="review",
...     condition=lambda ctx: ctx.get("auto_approve") is False,
... )
>>> conditional_edge = Edge(
...     source="review", target="approve", condition="context.get('approved') == True"
... )
Parameters:
__init__(source, target, condition=None)
Parameters:
condition: str | Callable[[WorkflowContext], bool] | None = None
evaluate_condition(context)[source]

Evaluate the edge condition against the workflow context.

Parameters:

context (WorkflowContext) – The current workflow execution context.

Return type:

bool

Returns:

True if the condition is met or if no condition exists, False otherwise.

Example

>>> edge = Edge(source="a", target="b", condition=lambda ctx: ctx.get("value") > 10)
>>> context.set("value", 15)
>>> edge.evaluate_condition(context)
True
get_source_name()[source]

Get the name of the source step.

Return type:

str

Returns:

The source step name as a string.

get_target_name()[source]

Get the name of the target step.

Return type:

str

Returns:

The target step name as a string.

source: str | type[Step]
target: str | type[Step]
class litestar_workflows.core.ExecutionEngine[source]

Bases: Protocol

Protocol for workflow execution engines.

The ExecutionEngine is responsible for orchestrating workflow execution, including starting workflows, executing steps, scheduling, and handling human tasks.

Example

>>> engine = LocalExecutionEngine()
>>> instance = await engine.start_workflow(MyWorkflow, initial_data={"input": "value"})
__init__(*args, **kwargs)
async cancel_workflow(instance_id, reason)[source]

Cancel a running workflow instance.

Parameters:
  • instance_id (UUID) – The workflow instance identifier.

  • reason (str) – Explanation for the cancellation.

Return type:

None

Example

>>> await engine.cancel_workflow(
...     instance_id=instance.id, reason="Request withdrawn by submitter"
... )
async complete_human_task(instance_id, step_name, user_id, data)[source]

Complete a human task with user-provided data.

Parameters:
  • instance_id (UUID) – The workflow instance identifier.

  • step_name (str) – Name of the human step to complete.

  • user_id (str) – Identifier of the user completing the task.

  • data (dict[str, Any]) – User-provided form data or input.

Return type:

None

Example

>>> await engine.complete_human_task(
...     instance_id=instance.id,
...     step_name="approval",
...     user_id="user_123",
...     data={"approved": True, "comments": "Looks good"},
... )
async execute_step(step, context, previous_result=None)[source]

Execute a single step within a workflow.

Parameters:
  • step (Step[Any]) – The step to execute.

  • context (WorkflowContext) – The current workflow context.

  • previous_result (Any) – Optional result from a previous step.

Return type:

Any

Returns:

The result of the step execution.

Raises:

Exception – If step execution fails and error handling doesn’t compensate.

async schedule_step(instance_id, step_name, delay=None)[source]

Schedule a step for later execution.

Parameters:
  • instance_id (UUID) – The workflow instance identifier.

  • step_name (str) – Name of the step to schedule.

  • delay (timedelta | None) – Optional delay before execution. If None, executes immediately.

Return type:

None

Example

>>> from datetime import timedelta
>>> await engine.schedule_step(
...     instance_id=instance.id, step_name="reminder", delay=timedelta(hours=24)
... )
async start_workflow(workflow, initial_data=None)[source]

Start a new workflow instance.

Parameters:
  • workflow (type[Workflow]) – The workflow class to instantiate and execute.

  • initial_data (dict[str, Any] | None) – Optional initial data to populate the workflow context.

Return type:

WorkflowInstance

Returns:

A WorkflowInstance representing the started workflow.

Example

>>> instance = await engine.start_workflow(
...     ApprovalWorkflow, initial_data={"document_id": "doc_123"}
... )
class litestar_workflows.core.HumanTaskCompleted[source]

Bases: WorkflowEvent

Event emitted when a human task is completed.

instance_id

Unique identifier of the workflow instance.

timestamp

When the task was completed.

step_name

Name of the human step.

task_id

Unique identifier for this task.

completed_by

User who completed the task.

form_data

Data submitted by the user.

comment

Optional comment provided by the user.

Example

>>> event = HumanTaskCompleted(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     step_name="manager_approval",
...     task_id=uuid4(),
...     completed_by="manager_123",
...     form_data={"approved": True, "amount": 1500.00},
...     comment="Approved for payment",
... )
Parameters:
__init__(instance_id, timestamp, step_name, task_id, completed_by, form_data=None, comment=None)
Parameters:
comment: str | None = None
form_data: dict[str, Any] | None = None
step_name: str
task_id: UUID
completed_by: str
class litestar_workflows.core.HumanTaskCreated[source]

Bases: WorkflowEvent

Event emitted when a human task is created.

instance_id

Unique identifier of the workflow instance.

timestamp

When the task was created.

step_name

Name of the human step.

task_id

Unique identifier for this task.

assignee

User or group assigned to the task.

title

Display title for the task.

description

Detailed description of what needs to be done.

due_at

Optional deadline for task completion.

Example

>>> event = HumanTaskCreated(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     step_name="manager_approval",
...     task_id=uuid4(),
...     assignee="manager_group",
...     title="Approve expense report",
...     description="Review and approve expense report for project X",
... )
Parameters:
__init__(instance_id, timestamp, step_name, task_id, assignee=None, title=None, description=None, due_at=None)
Parameters:
assignee: str | None = None
description: str | None = None
due_at: datetime | None = None
title: str | None = None
step_name: str
task_id: UUID
class litestar_workflows.core.HumanTaskReassigned[source]

Bases: WorkflowEvent

Event emitted when a human task is reassigned.

instance_id

Unique identifier of the workflow instance.

timestamp

When the task was reassigned.

step_name

Name of the human step.

task_id

Unique identifier for this task.

from_assignee

Previous assignee.

to_assignee

New assignee.

reassigned_by

User who performed the reassignment.

reason

Explanation for the reassignment.

Example

>>> event = HumanTaskReassigned(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     step_name="review",
...     task_id=uuid4(),
...     from_assignee="user_123",
...     to_assignee="user_456",
...     reassigned_by="admin_789",
...     reason="Original assignee on vacation",
... )
Parameters:
__init__(instance_id, timestamp, step_name, task_id, from_assignee=None, to_assignee=None, reassigned_by=None, reason=None)
Parameters:
from_assignee: str | None = None
reason: str | None = None
reassigned_by: str | None = None
to_assignee: str | None = None
step_name: str
task_id: UUID
class litestar_workflows.core.Step[source]

Bases: Protocol[T]

Protocol defining the interface for workflow steps.

Steps are the atomic units of work within a workflow. They can be machine-automated, human-interactive, or event-driven. Each step must implement the core execution interface along with lifecycle hooks.

name

Unique identifier for this step within its workflow.

description

Human-readable description of the step’s purpose.

step_type

Classification of this step (MACHINE, HUMAN, WEBHOOK, etc.).

Example

>>> class ApprovalStep:
...     name = "approval"
...     description = "Manager approval required"
...     step_type = StepType.HUMAN
...
...     async def execute(self, context: WorkflowContext) -> bool:
...         return context.get("approved", False)
...
...     async def can_execute(self, context: WorkflowContext) -> bool:
...         return context.get("request_submitted", False)
...
...     async def on_success(self, context: WorkflowContext, result: bool) -> None:
...         context.set("approval_result", result)
...
...     async def on_failure(self, context: WorkflowContext, error: Exception) -> None:
...         context.set("approval_error", str(error))
__init__(*args, **kwargs)
async can_execute(context)[source]

Determine if the step is eligible to execute.

This method acts as a guard or validator, checking preconditions before execution. If it returns False, the step will be marked as SKIPPED.

Parameters:

context (WorkflowContext) – The current workflow execution context.

Return type:

bool

Returns:

True if the step should execute, False to skip it.

Example

>>> async def can_execute(self, context: WorkflowContext) -> bool:
...     return context.get("prerequisites_met", False)
async execute(context)[source]

Execute the step’s primary logic.

This method contains the core functionality of the step. It receives the workflow context and can read/write data, perform operations, and return a result that will be stored in the execution history.

Parameters:

context (WorkflowContext) – The current workflow execution context.

Return type:

TypeVar(T, covariant=True)

Returns:

The result of the step execution, which will be stored in step history.

Raises:

Exception – Any exception raised will trigger the on_failure hook and potentially fail the workflow.

async on_failure(context, error)[source]

Hook called after failed step execution.

This method is invoked when execute() raises an exception. Use it for error logging, compensation logic, or triggering alerts.

Parameters:
  • context (WorkflowContext) – The current workflow execution context.

  • error (Exception) – The exception that was raised during execution.

Return type:

None

Example

>>> async def on_failure(self, context: WorkflowContext, error: Exception) -> None:
...     context.set("error", str(error))
...     await log_error(f"Step {self.name} failed: {error}")
async on_success(context, result)[source]

Hook called after successful step execution.

This method is invoked after execute() completes without raising an exception. Use it to update context, trigger side effects, or perform cleanup.

Parameters:
  • context (WorkflowContext) – The current workflow execution context.

  • result (TypeVar(T, covariant=True)) – The return value from the execute() method.

Return type:

None

Example

>>> async def on_success(self, context: WorkflowContext, result: dict) -> None:
...     context.set("last_result", result)
...     await send_notification(f"Step {self.name} completed")
name: str
description: str
step_type: StepType
class litestar_workflows.core.StepCompleted[source]

Bases: WorkflowEvent

Event emitted when a step completes successfully.

instance_id

Unique identifier of the workflow instance.

timestamp

When the step completed.

step_name

Name of the step that completed.

status

Final status of the step execution.

result

Return value from the step execution.

output_data

Output data produced by the step.

duration_seconds

Execution time in seconds.

Example

>>> event = StepCompleted(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     step_name="validation",
...     status="SUCCEEDED",
...     result={"valid": True},
...     duration_seconds=1.5,
... )
Parameters:
__init__(instance_id, timestamp, step_name, status, result=None, output_data=None, duration_seconds=None)
Parameters:
duration_seconds: float | None = None
output_data: dict[str, Any] | None = None
result: Any = None
step_name: str
status: str
class litestar_workflows.core.StepExecution[source]

Bases: object

Record of a single step execution within a workflow.

step_name

Name of the executed step.

status

Final status of the step execution.

started_at

Timestamp when step execution began.

completed_at

Timestamp when step execution finished (if completed).

result

Return value from successful execution.

error

Error message if execution failed.

input_data

Input data passed to the step.

output_data

Output data produced by the step.

Parameters:
__init__(step_name, status, started_at, completed_at=None, result=None, error=None, input_data=None, output_data=None)
Parameters:
completed_at: datetime | None = None
error: str | None = None
input_data: dict[str, Any] | None = None
output_data: dict[str, Any] | None = None
result: Any = None
step_name: str
status: str
started_at: datetime
class litestar_workflows.core.StepFailed[source]

Bases: WorkflowEvent

Event emitted when a step fails execution.

instance_id

Unique identifier of the workflow instance.

timestamp

When the step failed.

step_name

Name of the step that failed.

error

Error message describing the failure.

error_type

Type/class of the error.

retry_count

Number of retry attempts made.

Example

>>> event = StepFailed(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     step_name="api_call",
...     error="Connection timeout",
...     error_type="TimeoutError",
...     retry_count=3,
... )
Parameters:
__init__(instance_id, timestamp, step_name, error, error_type=None, retry_count=0)
Parameters:
error_type: str | None = None
retry_count: int = 0
step_name: str
error: str
class litestar_workflows.core.StepSkipped[source]

Bases: WorkflowEvent

Event emitted when a step is skipped due to conditions.

instance_id

Unique identifier of the workflow instance.

timestamp

When the step was skipped.

step_name

Name of the step that was skipped.

reason

Explanation for why the step was skipped.

Example

>>> event = StepSkipped(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     step_name="optional_notification",
...     reason="Notification disabled in settings",
... )
Parameters:
__init__(instance_id, timestamp, step_name, reason=None)
Parameters:
reason: str | None = None
step_name: str
class litestar_workflows.core.StepStarted[source]

Bases: WorkflowEvent

Event emitted when a step begins execution.

instance_id

Unique identifier of the workflow instance.

timestamp

When the step started.

step_name

Name of the step that started.

step_type

Type of the step (MACHINE, HUMAN, etc.).

input_data

Input data provided to the step.

Example

>>> event = StepStarted(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     step_name="validation",
...     step_type="MACHINE",
...     input_data={"data": "to_validate"},
... )
Parameters:
__init__(instance_id, timestamp, step_name, step_type, input_data=None)
Parameters:
input_data: dict[str, Any] | None = None
step_name: str
step_type: str
class litestar_workflows.core.StepStatus[source]

Bases: StrEnum

Execution status of a workflow step.

PENDING

Step has not yet been scheduled for execution.

SCHEDULED

Step is queued and awaiting execution.

RUNNING

Step is currently executing.

WAITING

Step is paused, waiting for external input or event.

SUCCEEDED

Step completed successfully.

FAILED

Step execution encountered an error.

CANCELED

Step was manually canceled before completion.

SKIPPED

Step was skipped due to conditional logic.

__new__(value)
PENDING = 'pending'
SCHEDULED = 'scheduled'
RUNNING = 'running'
WAITING = 'waiting'
SUCCEEDED = 'succeeded'
FAILED = 'failed'
CANCELED = 'canceled'
SKIPPED = 'skipped'
class litestar_workflows.core.StepType[source]

Bases: StrEnum

Classification of step types within a workflow.

MACHINE

Automated execution without human intervention.

HUMAN

Requires user interaction and input.

WEBHOOK

Waits for external callback or event.

TIMER

Waits for a time-based condition to be met.

GATEWAY

Decision or branching point in the workflow.

__new__(value)
MACHINE = 'machine'
HUMAN = 'human'
WEBHOOK = 'webhook'
TIMER = 'timer'
GATEWAY = 'gateway'
class litestar_workflows.core.Workflow[source]

Bases: Protocol

Protocol defining the interface for workflow definitions.

Workflows orchestrate a collection of steps connected by edges (transitions). They define the structure and flow logic but not the runtime state.

name

Unique identifier for this workflow.

version

Version string for workflow definition versioning.

description

Human-readable description of the workflow’s purpose.

Example

>>> class DocumentApproval:
...     name = "document_approval"
...     version = "1.0.0"
...     description = "Multi-level document approval workflow"
...
...     def get_definition(self) -> WorkflowDefinition:
...         return WorkflowDefinition(
...             name=self.name,
...             version=self.version,
...             description=self.description,
...             steps={...},
...             edges=[...],
...             initial_step="submit",
...             terminal_steps={"approved", "rejected"},
...         )
__init__(*args, **kwargs)
get_definition()[source]

Extract the workflow definition from the class.

Return type:

Any

Returns:

A WorkflowDefinition instance containing steps, edges, and metadata.

name: str
version: str
description: str
class litestar_workflows.core.WorkflowCanceled[source]

Bases: WorkflowEvent

Event emitted when a workflow instance is manually canceled.

instance_id

Unique identifier of the workflow instance.

timestamp

When the workflow was canceled.

reason

Explanation for the cancellation.

canceled_by

User who canceled the workflow.

current_step

Step that was executing when canceled.

Example

>>> event = WorkflowCanceled(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     reason="Request withdrawn",
...     canceled_by="user_123",
...     current_step="review",
... )
Parameters:
__init__(instance_id, timestamp, reason, canceled_by=None, current_step=None)
Parameters:
canceled_by: str | None = None
current_step: str | None = None
reason: str
class litestar_workflows.core.WorkflowCompleted[source]

Bases: WorkflowEvent

Event emitted when a workflow instance completes successfully.

instance_id

Unique identifier of the workflow instance.

timestamp

When the workflow completed.

status

Final status of the workflow.

final_step

Name of the final step that was executed.

duration_seconds

Total execution time in seconds.

Example

>>> event = WorkflowCompleted(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     status="COMPLETED",
...     final_step="approve",
...     duration_seconds=3600.5,
... )
Parameters:
__init__(instance_id, timestamp, status, final_step=None, duration_seconds=None)
Parameters:
duration_seconds: float | None = None
final_step: str | None = None
status: str
class litestar_workflows.core.WorkflowContext[source]

Bases: object

Execution context passed between workflow steps.

The WorkflowContext carries all state, metadata, and execution history throughout a workflow’s lifecycle. It provides a mutable data dictionary for step communication and immutable metadata for audit and tracking purposes.

workflow_id

Unique identifier for the workflow definition.

instance_id

Unique identifier for this workflow instance.

data

Mutable state dictionary for inter-step communication.

metadata

Immutable metadata about the workflow execution.

current_step

Name of the currently executing step.

step_history

Chronological record of all step executions.

started_at

Timestamp when the workflow instance was created.

user_id

Optional user identifier for human task contexts.

tenant_id

Optional tenant identifier for multi-tenancy support.

Example

>>> from uuid import uuid4
>>> from datetime import datetime
>>> context = WorkflowContext(
...     workflow_id=uuid4(),
...     instance_id=uuid4(),
...     data={"input": "value"},
...     metadata={"creator": "user123"},
...     current_step="initial_step",
...     step_history=[],
...     started_at=datetime.utcnow(),
... )
>>> context.set("result", 42)
>>> context.get("result")
42
Parameters:
__init__(workflow_id, instance_id, data, metadata, current_step, step_history, started_at, user_id=None, tenant_id=None)
Parameters:
get(key, default=None)[source]

Retrieve a value from the workflow data dictionary.

Parameters:
  • key (str) – The key to look up in the data dictionary.

  • default (Any) – Default value to return if key is not found.

Return type:

Any

Returns:

The value associated with the key, or the default if not present.

Example

>>> context.get("some_key", "default_value")
'default_value'
get_last_execution(step_name=None)[source]

Get the most recent execution record for a step.

Parameters:

step_name (str | None) – Optional step name to filter by. If None, returns the last execution regardless of step.

Return type:

StepExecution | None

Returns:

The most recent StepExecution matching the criteria, or None if not found.

Example

>>> last_exec = context.get_last_execution("approval_step")
>>> if last_exec and last_exec.status == "SUCCEEDED":
...     print("Step succeeded")
has_step_executed(step_name)[source]

Check if a step has been executed in this workflow instance.

Parameters:

step_name (str) – Name of the step to check.

Return type:

bool

Returns:

True if the step appears in the execution history, False otherwise.

Example

>>> if context.has_step_executed("validation"):
...     print("Validation already completed")
set(key, value)[source]

Set a value in the workflow data dictionary.

Parameters:
  • key (str) – The key to set in the data dictionary.

  • value (Any) – The value to associate with the key.

Return type:

None

Example

>>> context.set("status", "approved")
>>> context.get("status")
'approved'
tenant_id: str | None = None
user_id: str | None = None
with_step(step_name)[source]

Create a new context for the given step execution.

This method returns a shallow copy of the context with the current_step updated to the provided step name. The data and metadata dictionaries are shared with the original context.

Parameters:

step_name (str) – Name of the step to execute next.

Return type:

WorkflowContext

Returns:

A new WorkflowContext instance with updated current_step.

Example

>>> new_context = context.with_step("next_step")
>>> new_context.current_step
'next_step'
workflow_id: UUID
instance_id: UUID
data: dict[str, Any]
metadata: dict[str, Any]
current_step: str
step_history: list[StepExecution]
started_at: datetime
class litestar_workflows.core.WorkflowDefinition[source]

Bases: object

Declarative workflow structure.

The WorkflowDefinition captures the complete structure of a workflow including all steps, edges, and metadata. It serves as the blueprint for workflow execution.

name

Unique identifier for the workflow.

version

Version string for workflow versioning.

description

Human-readable description of the workflow’s purpose.

steps

Dictionary mapping step names to Step instances.

edges

List of Edge instances defining the workflow graph.

initial_step

Name of the step to execute first.

terminal_steps

Set of step names that mark workflow completion.

Example

>>> from litestar_workflows.core.types import StepType
>>> definition = WorkflowDefinition(
...     name="approval_flow",
...     version="1.0.0",
...     description="Document approval workflow",
...     steps={
...         "submit": SubmitStep(),
...         "review": ReviewStep(),
...         "approve": ApproveStep(),
...     },
...     edges=[
...         Edge(source="submit", target="review"),
...         Edge(source="review", target="approve"),
...     ],
...     initial_step="submit",
...     terminal_steps={"approve"},
... )
Parameters:
__init__(name, version, description, steps, edges, initial_step, terminal_steps=<factory>)
Parameters:
get_next_steps(current_step, context)[source]

Get the list of next steps from the current step based on edge conditions.

Parameters:
  • current_step (str) – Name of the current step.

  • context (WorkflowContext) – The workflow execution context for condition evaluation.

Return type:

list[str]

Returns:

List of step names that should be executed next.

Example

>>> next_steps = definition.get_next_steps("review", context)
>>> if "approve" in next_steps:
...     print("Moving to approval")
to_mermaid()[source]

Generate a MermaidJS graph representation of the workflow.

Return type:

str

Returns:

MermaidJS graph definition as a string.

Example

>>> mermaid = definition.to_mermaid()
>>> print(mermaid)
graph TD
    submit[Submit]
    review{Review}
    approve[Approve]
    submit --> review
    review --> approve
to_mermaid_with_state(current_step=None, completed_steps=None, failed_steps=None)[source]

Generate a MermaidJS graph with execution state highlighting.

Parameters:
  • current_step (str | None) – Name of the currently executing step.

  • completed_steps (list[str] | None) – List of successfully completed step names.

  • failed_steps (list[str] | None) – List of failed step names.

Return type:

str

Returns:

MermaidJS graph definition with state styling.

Example

>>> mermaid = definition.to_mermaid_with_state(
...     current_step="review", completed_steps=["submit"], failed_steps=[]
... )
validate()[source]

Validate the workflow definition for common issues.

Return type:

list[str]

Returns:

List of validation error messages. Empty list if valid.

Example

>>> errors = definition.validate()
>>> if errors:
...     print("Validation errors:", errors)
name: str
version: str
description: str
steps: dict[str, Step[Any]]
edges: list[Edge]
initial_step: str
terminal_steps: set[str]
class litestar_workflows.core.WorkflowEvent[source]

Bases: object

Base class for all workflow events.

All workflow events include the instance_id and timestamp for tracking and correlation.

instance_id

Unique identifier of the workflow instance.

timestamp

When the event occurred.

Parameters:
__init__(instance_id, timestamp)
Parameters:
instance_id: UUID
timestamp: datetime
class litestar_workflows.core.WorkflowFailed[source]

Bases: WorkflowEvent

Event emitted when a workflow instance fails.

instance_id

Unique identifier of the workflow instance.

timestamp

When the workflow failed.

error

Error message describing the failure.

failed_step

Name of the step that caused the failure.

error_type

Type/class of the error that occurred.

stack_trace

Optional stack trace for debugging.

Example

>>> event = WorkflowFailed(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     error="Database connection failed",
...     failed_step="data_validation",
...     error_type="ConnectionError",
... )
Parameters:
__init__(instance_id, timestamp, error, failed_step=None, error_type=None, stack_trace=None)
Parameters:
error_type: str | None = None
failed_step: str | None = None
stack_trace: str | None = None
error: str
class litestar_workflows.core.WorkflowInstance[source]

Bases: Protocol

Protocol for workflow runtime instances.

A WorkflowInstance represents a single execution of a workflow definition. It tracks runtime state, progress, and results.

id

Unique identifier for this workflow instance.

workflow_id

Identifier of the workflow definition being executed.

workflow_name

Name of the workflow definition.

workflow_version

Version of the workflow definition.

status

Current execution status.

context

Runtime execution context.

current_step

Name of the currently executing or next step.

started_at

Timestamp when the instance was created.

completed_at

Timestamp when the instance finished (if applicable).

error

Error message if the workflow failed.

__init__(*args, **kwargs)
id: UUID
workflow_id: UUID
workflow_name: str
workflow_version: str
status: WorkflowStatus
context: WorkflowContext
current_step: str | None
started_at: Any
completed_at: Any | None
error: str | None
class litestar_workflows.core.WorkflowPaused[source]

Bases: WorkflowEvent

Event emitted when a workflow instance is paused.

instance_id

Unique identifier of the workflow instance.

timestamp

When the workflow was paused.

reason

Explanation for pausing.

paused_at_step

Step where execution was paused.

Example

>>> event = WorkflowPaused(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     reason="Awaiting external data",
...     paused_at_step="data_ingestion",
... )
Parameters:
__init__(instance_id, timestamp, reason=None, paused_at_step=None)
Parameters:
paused_at_step: str | None = None
reason: str | None = None
class litestar_workflows.core.WorkflowResumed[source]

Bases: WorkflowEvent

Event emitted when a paused workflow instance resumes.

instance_id

Unique identifier of the workflow instance.

timestamp

When the workflow resumed.

resumed_by

User who resumed the workflow.

resuming_at_step

Step where execution will resume.

Example

>>> event = WorkflowResumed(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     resumed_by="user_456",
...     resuming_at_step="data_processing",
... )
Parameters:
__init__(instance_id, timestamp, resumed_by=None, resuming_at_step=None)
Parameters:
resumed_by: str | None = None
resuming_at_step: str | None = None
class litestar_workflows.core.WorkflowStarted[source]

Bases: WorkflowEvent

Event emitted when a workflow instance starts execution.

instance_id

Unique identifier of the workflow instance.

timestamp

When the workflow started.

workflow_name

Name of the workflow definition.

workflow_version

Version of the workflow definition.

initial_data

Initial data provided when starting the workflow.

user_id

Optional user who initiated the workflow.

tenant_id

Optional tenant identifier for multi-tenancy.

Example

>>> event = WorkflowStarted(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     workflow_name="approval_flow",
...     workflow_version="1.0.0",
...     initial_data={"document_id": "doc_123"},
... )
Parameters:
__init__(instance_id, timestamp, workflow_name, workflow_version, initial_data=None, user_id=None, tenant_id=None)
Parameters:
initial_data: dict[str, Any] | None = None
tenant_id: str | None = None
user_id: str | None = None
workflow_name: str
workflow_version: str
class litestar_workflows.core.WorkflowStatus[source]

Bases: StrEnum

Overall status of a workflow instance.

PENDING

Workflow has been created but not yet started.

RUNNING

Workflow is actively executing steps.

PAUSED

Workflow execution is temporarily paused.

WAITING

Workflow is waiting for human input or external event.

COMPLETED

Workflow finished successfully.

FAILED

Workflow terminated due to an error.

CANCELED

Workflow was manually canceled.

__new__(value)
PENDING = 'pending'
RUNNING = 'running'
PAUSED = 'paused'
WAITING = 'waiting'
COMPLETED = 'completed'
FAILED = 'failed'
CANCELED = 'canceled'

Core type definitions for litestar-workflows.

This module defines the fundamental types, enums, and type aliases used throughout the workflow system.

litestar_workflows.core.types.Context

Type alias for workflow context data dictionary.

alias of dict[str, Any]

class litestar_workflows.core.types.StepExecution[source]

Bases: object

Record of a single step execution within a workflow.

step_name

Name of the executed step.

status

Final status of the step execution.

started_at

Timestamp when step execution began.

completed_at

Timestamp when step execution finished (if completed).

result

Return value from successful execution.

error

Error message if execution failed.

input_data

Input data passed to the step.

output_data

Output data produced by the step.

Parameters:
__init__(step_name, status, started_at, completed_at=None, result=None, error=None, input_data=None, output_data=None)
Parameters:
completed_at: datetime | None = None
error: str | None = None
input_data: dict[str, Any] | None = None
output_data: dict[str, Any] | None = None
result: Any = None
step_name: str
status: str
started_at: datetime
class litestar_workflows.core.types.StepStatus[source]

Bases: StrEnum

Execution status of a workflow step.

PENDING

Step has not yet been scheduled for execution.

SCHEDULED

Step is queued and awaiting execution.

RUNNING

Step is currently executing.

WAITING

Step is paused, waiting for external input or event.

SUCCEEDED

Step completed successfully.

FAILED

Step execution encountered an error.

CANCELED

Step was manually canceled before completion.

SKIPPED

Step was skipped due to conditional logic.

PENDING = 'pending'
SCHEDULED = 'scheduled'
RUNNING = 'running'
WAITING = 'waiting'
SUCCEEDED = 'succeeded'
FAILED = 'failed'
CANCELED = 'canceled'
SKIPPED = 'skipped'
__new__(value)
class litestar_workflows.core.types.StepT

Type variable bound to the Step protocol.

alias of TypeVar(‘StepT’, bound=Step)

class litestar_workflows.core.types.StepType[source]

Bases: StrEnum

Classification of step types within a workflow.

MACHINE

Automated execution without human intervention.

HUMAN

Requires user interaction and input.

WEBHOOK

Waits for external callback or event.

TIMER

Waits for a time-based condition to be met.

GATEWAY

Decision or branching point in the workflow.

MACHINE = 'machine'
HUMAN = 'human'
WEBHOOK = 'webhook'
TIMER = 'timer'
GATEWAY = 'gateway'
__new__(value)
class litestar_workflows.core.types.T

Generic type variable for return values.

alias of TypeVar(‘T’)

class litestar_workflows.core.types.WorkflowStatus[source]

Bases: StrEnum

Overall status of a workflow instance.

PENDING

Workflow has been created but not yet started.

RUNNING

Workflow is actively executing steps.

PAUSED

Workflow execution is temporarily paused.

WAITING

Workflow is waiting for human input or external event.

COMPLETED

Workflow finished successfully.

FAILED

Workflow terminated due to an error.

CANCELED

Workflow was manually canceled.

PENDING = 'pending'
RUNNING = 'running'
PAUSED = 'paused'
WAITING = 'waiting'
COMPLETED = 'completed'
FAILED = 'failed'
CANCELED = 'canceled'
__new__(value)
class litestar_workflows.core.types.WorkflowT

Type variable bound to the Workflow protocol.

alias of TypeVar(‘WorkflowT’, bound=Workflow)

Workflow execution context.

This module provides the WorkflowContext dataclass which carries state and metadata throughout workflow execution.

class litestar_workflows.core.context.StepExecution[source]

Bases: object

Record of a single step execution within a workflow.

step_name

Name of the executed step.

status

Final status of the step execution.

started_at

Timestamp when step execution began.

completed_at

Timestamp when step execution finished (if completed).

result

Return value from successful execution.

error

Error message if execution failed.

input_data

Input data passed to the step.

output_data

Output data produced by the step.

Parameters:
step_name: str
status: str
started_at: datetime
completed_at: datetime | None = None
result: Any = None
error: str | None = None
input_data: dict[str, Any] | None = None
output_data: dict[str, Any] | None = None
__init__(step_name, status, started_at, completed_at=None, result=None, error=None, input_data=None, output_data=None)
Parameters:
class litestar_workflows.core.context.WorkflowContext[source]

Bases: object

Execution context passed between workflow steps.

The WorkflowContext carries all state, metadata, and execution history throughout a workflow’s lifecycle. It provides a mutable data dictionary for step communication and immutable metadata for audit and tracking purposes.

workflow_id

Unique identifier for the workflow definition.

instance_id

Unique identifier for this workflow instance.

data

Mutable state dictionary for inter-step communication.

metadata

Immutable metadata about the workflow execution.

current_step

Name of the currently executing step.

step_history

Chronological record of all step executions.

started_at

Timestamp when the workflow instance was created.

user_id

Optional user identifier for human task contexts.

tenant_id

Optional tenant identifier for multi-tenancy support.

Example

>>> from uuid import uuid4
>>> from datetime import datetime
>>> context = WorkflowContext(
...     workflow_id=uuid4(),
...     instance_id=uuid4(),
...     data={"input": "value"},
...     metadata={"creator": "user123"},
...     current_step="initial_step",
...     step_history=[],
...     started_at=datetime.utcnow(),
... )
>>> context.set("result", 42)
>>> context.get("result")
42
Parameters:
workflow_id: UUID
instance_id: UUID
data: dict[str, Any]
metadata: dict[str, Any]
current_step: str
step_history: list[StepExecution]
started_at: datetime
user_id: str | None = None
tenant_id: str | None = None
get(key, default=None)[source]

Retrieve a value from the workflow data dictionary.

Parameters:
  • key (str) – The key to look up in the data dictionary.

  • default (Any) – Default value to return if key is not found.

Return type:

Any

Returns:

The value associated with the key, or the default if not present.

Example

>>> context.get("some_key", "default_value")
'default_value'
set(key, value)[source]

Set a value in the workflow data dictionary.

Parameters:
  • key (str) – The key to set in the data dictionary.

  • value (Any) – The value to associate with the key.

Return type:

None

Example

>>> context.set("status", "approved")
>>> context.get("status")
'approved'
with_step(step_name)[source]

Create a new context for the given step execution.

This method returns a shallow copy of the context with the current_step updated to the provided step name. The data and metadata dictionaries are shared with the original context.

Parameters:

step_name (str) – Name of the step to execute next.

Return type:

WorkflowContext

Returns:

A new WorkflowContext instance with updated current_step.

Example

>>> new_context = context.with_step("next_step")
>>> new_context.current_step
'next_step'
get_last_execution(step_name=None)[source]

Get the most recent execution record for a step.

Parameters:

step_name (str | None) – Optional step name to filter by. If None, returns the last execution regardless of step.

Return type:

StepExecution | None

Returns:

The most recent StepExecution matching the criteria, or None if not found.

Example

>>> last_exec = context.get_last_execution("approval_step")
>>> if last_exec and last_exec.status == "SUCCEEDED":
...     print("Step succeeded")
has_step_executed(step_name)[source]

Check if a step has been executed in this workflow instance.

Parameters:

step_name (str) – Name of the step to check.

Return type:

bool

Returns:

True if the step appears in the execution history, False otherwise.

Example

>>> if context.has_step_executed("validation"):
...     print("Validation already completed")
__init__(workflow_id, instance_id, data, metadata, current_step, step_history, started_at, user_id=None, tenant_id=None)
Parameters:

Workflow definition and edge structures.

This module provides the data structures for defining workflow graphs, including edges (transitions) and the complete workflow definition.

class litestar_workflows.core.definition.Edge[source]

Bases: object

Defines a transition between workflow steps.

An edge represents a directed connection from one step to another, optionally conditioned on a predicate function or expression.

source

Name of the source step or the Step class itself.

target

Name of the target step or the Step class itself.

condition

Optional condition for edge traversal. Can be a callable that takes WorkflowContext and returns bool, or a string expression.

Example

>>> edge = Edge(
...     source="submit",
...     target="review",
...     condition=lambda ctx: ctx.get("auto_approve") is False,
... )
>>> conditional_edge = Edge(
...     source="review", target="approve", condition="context.get('approved') == True"
... )
Parameters:
source: str | type[Step]
target: str | type[Step]
condition: str | Callable[[WorkflowContext], bool] | None = None
evaluate_condition(context)[source]

Evaluate the edge condition against the workflow context.

Parameters:

context (WorkflowContext) – The current workflow execution context.

Return type:

bool

Returns:

True if the condition is met or if no condition exists, False otherwise.

Example

>>> edge = Edge(source="a", target="b", condition=lambda ctx: ctx.get("value") > 10)
>>> context.set("value", 15)
>>> edge.evaluate_condition(context)
True
get_source_name()[source]

Get the name of the source step.

Return type:

str

Returns:

The source step name as a string.

get_target_name()[source]

Get the name of the target step.

Return type:

str

Returns:

The target step name as a string.

__init__(source, target, condition=None)
Parameters:
class litestar_workflows.core.definition.WorkflowDefinition[source]

Bases: object

Declarative workflow structure.

The WorkflowDefinition captures the complete structure of a workflow including all steps, edges, and metadata. It serves as the blueprint for workflow execution.

name

Unique identifier for the workflow.

version

Version string for workflow versioning.

description

Human-readable description of the workflow’s purpose.

steps

Dictionary mapping step names to Step instances.

edges

List of Edge instances defining the workflow graph.

initial_step

Name of the step to execute first.

terminal_steps

Set of step names that mark workflow completion.

Example

>>> from litestar_workflows.core.types import StepType
>>> definition = WorkflowDefinition(
...     name="approval_flow",
...     version="1.0.0",
...     description="Document approval workflow",
...     steps={
...         "submit": SubmitStep(),
...         "review": ReviewStep(),
...         "approve": ApproveStep(),
...     },
...     edges=[
...         Edge(source="submit", target="review"),
...         Edge(source="review", target="approve"),
...     ],
...     initial_step="submit",
...     terminal_steps={"approve"},
... )
Parameters:
name: str
version: str
description: str
steps: dict[str, Step[Any]]
edges: list[Edge]
initial_step: str
terminal_steps: set[str]
validate()[source]

Validate the workflow definition for common issues.

Return type:

list[str]

Returns:

List of validation error messages. Empty list if valid.

Example

>>> errors = definition.validate()
>>> if errors:
...     print("Validation errors:", errors)
get_next_steps(current_step, context)[source]

Get the list of next steps from the current step based on edge conditions.

Parameters:
  • current_step (str) – Name of the current step.

  • context (WorkflowContext) – The workflow execution context for condition evaluation.

Return type:

list[str]

Returns:

List of step names that should be executed next.

Example

>>> next_steps = definition.get_next_steps("review", context)
>>> if "approve" in next_steps:
...     print("Moving to approval")
to_mermaid()[source]

Generate a MermaidJS graph representation of the workflow.

Return type:

str

Returns:

MermaidJS graph definition as a string.

Example

>>> mermaid = definition.to_mermaid()
>>> print(mermaid)
graph TD
    submit[Submit]
    review{Review}
    approve[Approve]
    submit --> review
    review --> approve
to_mermaid_with_state(current_step=None, completed_steps=None, failed_steps=None)[source]

Generate a MermaidJS graph with execution state highlighting.

Parameters:
  • current_step (str | None) – Name of the currently executing step.

  • completed_steps (list[str] | None) – List of successfully completed step names.

  • failed_steps (list[str] | None) – List of failed step names.

Return type:

str

Returns:

MermaidJS graph definition with state styling.

Example

>>> mermaid = definition.to_mermaid_with_state(
...     current_step="review", completed_steps=["submit"], failed_steps=[]
... )
__init__(name, version, description, steps, edges, initial_step, terminal_steps=<factory>)
Parameters:

Domain events for workflow lifecycle.

This module defines the event types that are emitted during workflow execution. These events can be used for logging, monitoring, triggering side effects, or integrating with external systems.

class litestar_workflows.core.events.HumanTaskCompleted[source]

Bases: WorkflowEvent

Event emitted when a human task is completed.

instance_id

Unique identifier of the workflow instance.

timestamp

When the task was completed.

step_name

Name of the human step.

task_id

Unique identifier for this task.

completed_by

User who completed the task.

form_data

Data submitted by the user.

comment

Optional comment provided by the user.

Example

>>> event = HumanTaskCompleted(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     step_name="manager_approval",
...     task_id=uuid4(),
...     completed_by="manager_123",
...     form_data={"approved": True, "amount": 1500.00},
...     comment="Approved for payment",
... )
Parameters:
step_name: str
task_id: UUID
completed_by: str
form_data: dict[str, Any] | None = None
comment: str | None = None
__init__(instance_id, timestamp, step_name, task_id, completed_by, form_data=None, comment=None)
Parameters:
instance_id: UUID
timestamp: datetime
class litestar_workflows.core.events.HumanTaskCreated[source]

Bases: WorkflowEvent

Event emitted when a human task is created.

instance_id

Unique identifier of the workflow instance.

timestamp

When the task was created.

step_name

Name of the human step.

task_id

Unique identifier for this task.

assignee

User or group assigned to the task.

title

Display title for the task.

description

Detailed description of what needs to be done.

due_at

Optional deadline for task completion.

Example

>>> event = HumanTaskCreated(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     step_name="manager_approval",
...     task_id=uuid4(),
...     assignee="manager_group",
...     title="Approve expense report",
...     description="Review and approve expense report for project X",
... )
Parameters:
step_name: str
task_id: UUID
assignee: str | None = None
title: str | None = None
description: str | None = None
due_at: datetime | None = None
__init__(instance_id, timestamp, step_name, task_id, assignee=None, title=None, description=None, due_at=None)
Parameters:
instance_id: UUID
timestamp: datetime
class litestar_workflows.core.events.HumanTaskReassigned[source]

Bases: WorkflowEvent

Event emitted when a human task is reassigned.

instance_id

Unique identifier of the workflow instance.

timestamp

When the task was reassigned.

step_name

Name of the human step.

task_id

Unique identifier for this task.

from_assignee

Previous assignee.

to_assignee

New assignee.

reassigned_by

User who performed the reassignment.

reason

Explanation for the reassignment.

Example

>>> event = HumanTaskReassigned(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     step_name="review",
...     task_id=uuid4(),
...     from_assignee="user_123",
...     to_assignee="user_456",
...     reassigned_by="admin_789",
...     reason="Original assignee on vacation",
... )
Parameters:
step_name: str
task_id: UUID
from_assignee: str | None = None
to_assignee: str | None = None
reassigned_by: str | None = None
reason: str | None = None
__init__(instance_id, timestamp, step_name, task_id, from_assignee=None, to_assignee=None, reassigned_by=None, reason=None)
Parameters:
instance_id: UUID
timestamp: datetime
class litestar_workflows.core.events.StepCompleted[source]

Bases: WorkflowEvent

Event emitted when a step completes successfully.

instance_id

Unique identifier of the workflow instance.

timestamp

When the step completed.

step_name

Name of the step that completed.

status

Final status of the step execution.

result

Return value from the step execution.

output_data

Output data produced by the step.

duration_seconds

Execution time in seconds.

Example

>>> event = StepCompleted(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     step_name="validation",
...     status="SUCCEEDED",
...     result={"valid": True},
...     duration_seconds=1.5,
... )
Parameters:
step_name: str
status: str
result: Any = None
output_data: dict[str, Any] | None = None
duration_seconds: float | None = None
__init__(instance_id, timestamp, step_name, status, result=None, output_data=None, duration_seconds=None)
Parameters:
instance_id: UUID
timestamp: datetime
class litestar_workflows.core.events.StepFailed[source]

Bases: WorkflowEvent

Event emitted when a step fails execution.

instance_id

Unique identifier of the workflow instance.

timestamp

When the step failed.

step_name

Name of the step that failed.

error

Error message describing the failure.

error_type

Type/class of the error.

retry_count

Number of retry attempts made.

Example

>>> event = StepFailed(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     step_name="api_call",
...     error="Connection timeout",
...     error_type="TimeoutError",
...     retry_count=3,
... )
Parameters:
step_name: str
error: str
error_type: str | None = None
retry_count: int = 0
__init__(instance_id, timestamp, step_name, error, error_type=None, retry_count=0)
Parameters:
instance_id: UUID
timestamp: datetime
class litestar_workflows.core.events.StepSkipped[source]

Bases: WorkflowEvent

Event emitted when a step is skipped due to conditions.

instance_id

Unique identifier of the workflow instance.

timestamp

When the step was skipped.

step_name

Name of the step that was skipped.

reason

Explanation for why the step was skipped.

Example

>>> event = StepSkipped(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     step_name="optional_notification",
...     reason="Notification disabled in settings",
... )
Parameters:
step_name: str
reason: str | None = None
__init__(instance_id, timestamp, step_name, reason=None)
Parameters:
instance_id: UUID
timestamp: datetime
class litestar_workflows.core.events.StepStarted[source]

Bases: WorkflowEvent

Event emitted when a step begins execution.

instance_id

Unique identifier of the workflow instance.

timestamp

When the step started.

step_name

Name of the step that started.

step_type

Type of the step (MACHINE, HUMAN, etc.).

input_data

Input data provided to the step.

Example

>>> event = StepStarted(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     step_name="validation",
...     step_type="MACHINE",
...     input_data={"data": "to_validate"},
... )
Parameters:
step_name: str
step_type: str
input_data: dict[str, Any] | None = None
__init__(instance_id, timestamp, step_name, step_type, input_data=None)
Parameters:
instance_id: UUID
timestamp: datetime
class litestar_workflows.core.events.WorkflowCanceled[source]

Bases: WorkflowEvent

Event emitted when a workflow instance is manually canceled.

instance_id

Unique identifier of the workflow instance.

timestamp

When the workflow was canceled.

reason

Explanation for the cancellation.

canceled_by

User who canceled the workflow.

current_step

Step that was executing when canceled.

Example

>>> event = WorkflowCanceled(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     reason="Request withdrawn",
...     canceled_by="user_123",
...     current_step="review",
... )
Parameters:
reason: str
canceled_by: str | None = None
current_step: str | None = None
__init__(instance_id, timestamp, reason, canceled_by=None, current_step=None)
Parameters:
instance_id: UUID
timestamp: datetime
class litestar_workflows.core.events.WorkflowCompleted[source]

Bases: WorkflowEvent

Event emitted when a workflow instance completes successfully.

instance_id

Unique identifier of the workflow instance.

timestamp

When the workflow completed.

status

Final status of the workflow.

final_step

Name of the final step that was executed.

duration_seconds

Total execution time in seconds.

Example

>>> event = WorkflowCompleted(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     status="COMPLETED",
...     final_step="approve",
...     duration_seconds=3600.5,
... )
Parameters:
status: str
final_step: str | None = None
duration_seconds: float | None = None
__init__(instance_id, timestamp, status, final_step=None, duration_seconds=None)
Parameters:
instance_id: UUID
timestamp: datetime
class litestar_workflows.core.events.WorkflowEvent[source]

Bases: object

Base class for all workflow events.

All workflow events include the instance_id and timestamp for tracking and correlation.

instance_id

Unique identifier of the workflow instance.

timestamp

When the event occurred.

Parameters:
instance_id: UUID
timestamp: datetime
__init__(instance_id, timestamp)
Parameters:
class litestar_workflows.core.events.WorkflowFailed[source]

Bases: WorkflowEvent

Event emitted when a workflow instance fails.

instance_id

Unique identifier of the workflow instance.

timestamp

When the workflow failed.

error

Error message describing the failure.

failed_step

Name of the step that caused the failure.

error_type

Type/class of the error that occurred.

stack_trace

Optional stack trace for debugging.

Example

>>> event = WorkflowFailed(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     error="Database connection failed",
...     failed_step="data_validation",
...     error_type="ConnectionError",
... )
Parameters:
error: str
failed_step: str | None = None
error_type: str | None = None
stack_trace: str | None = None
__init__(instance_id, timestamp, error, failed_step=None, error_type=None, stack_trace=None)
Parameters:
instance_id: UUID
timestamp: datetime
class litestar_workflows.core.events.WorkflowPaused[source]

Bases: WorkflowEvent

Event emitted when a workflow instance is paused.

instance_id

Unique identifier of the workflow instance.

timestamp

When the workflow was paused.

reason

Explanation for pausing.

paused_at_step

Step where execution was paused.

Example

>>> event = WorkflowPaused(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     reason="Awaiting external data",
...     paused_at_step="data_ingestion",
... )
Parameters:
reason: str | None = None
paused_at_step: str | None = None
__init__(instance_id, timestamp, reason=None, paused_at_step=None)
Parameters:
instance_id: UUID
timestamp: datetime
class litestar_workflows.core.events.WorkflowResumed[source]

Bases: WorkflowEvent

Event emitted when a paused workflow instance resumes.

instance_id

Unique identifier of the workflow instance.

timestamp

When the workflow resumed.

resumed_by

User who resumed the workflow.

resuming_at_step

Step where execution will resume.

Example

>>> event = WorkflowResumed(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     resumed_by="user_456",
...     resuming_at_step="data_processing",
... )
Parameters:
resumed_by: str | None = None
resuming_at_step: str | None = None
__init__(instance_id, timestamp, resumed_by=None, resuming_at_step=None)
Parameters:
instance_id: UUID
timestamp: datetime
class litestar_workflows.core.events.WorkflowStarted[source]

Bases: WorkflowEvent

Event emitted when a workflow instance starts execution.

instance_id

Unique identifier of the workflow instance.

timestamp

When the workflow started.

workflow_name

Name of the workflow definition.

workflow_version

Version of the workflow definition.

initial_data

Initial data provided when starting the workflow.

user_id

Optional user who initiated the workflow.

tenant_id

Optional tenant identifier for multi-tenancy.

Example

>>> event = WorkflowStarted(
...     instance_id=uuid4(),
...     timestamp=datetime.utcnow(),
...     workflow_name="approval_flow",
...     workflow_version="1.0.0",
...     initial_data={"document_id": "doc_123"},
... )
Parameters:
workflow_name: str
workflow_version: str
initial_data: dict[str, Any] | None = None
user_id: str | None = None
tenant_id: str | None = None
__init__(instance_id, timestamp, workflow_name, workflow_version, initial_data=None, user_id=None, tenant_id=None)
Parameters:
instance_id: UUID
timestamp: datetime

Public API

The following is available from the top-level package:

from litestar_workflows import (
    # Protocols
    Step,
    Workflow,
    ExecutionEngine,

    # Types
    StepType,
    StepStatus,
    WorkflowStatus,
    WorkflowContext,
    WorkflowDefinition,
    Edge,

    # Base implementations
    BaseMachineStep,
    BaseHumanStep,
    BaseGateway,

    # Groups
    SequentialGroup,
    ParallelGroup,
    ConditionalGroup,

    # Engine
    LocalExecutionEngine,
    WorkflowRegistry,

    # Exceptions
    WorkflowsError,
    WorkflowNotFoundError,
    StepExecutionError,
    InvalidTransitionError,
)

Engine Module

Execution engines and workflow registry.

Workflow execution engine implementations.

This module provides execution engines for orchestrating workflow instances, managing state transitions, and coordinating step execution.

class litestar_workflows.engine.ExecutionEngine[source]

Bases: Protocol

Protocol for workflow execution engines.

The ExecutionEngine is responsible for orchestrating workflow execution, including starting workflows, executing steps, scheduling, and handling human tasks.

Example

>>> engine = LocalExecutionEngine()
>>> instance = await engine.start_workflow(MyWorkflow, initial_data={"input": "value"})
__init__(*args, **kwargs)
async cancel_workflow(instance_id, reason)[source]

Cancel a running workflow instance.

Parameters:
  • instance_id (UUID) – The workflow instance identifier.

  • reason (str) – Explanation for the cancellation.

Return type:

None

Example

>>> await engine.cancel_workflow(
...     instance_id=instance.id, reason="Request withdrawn by submitter"
... )
async complete_human_task(instance_id, step_name, user_id, data)[source]

Complete a human task with user-provided data.

Parameters:
  • instance_id (UUID) – The workflow instance identifier.

  • step_name (str) – Name of the human step to complete.

  • user_id (str) – Identifier of the user completing the task.

  • data (dict[str, Any]) – User-provided form data or input.

Return type:

None

Example

>>> await engine.complete_human_task(
...     instance_id=instance.id,
...     step_name="approval",
...     user_id="user_123",
...     data={"approved": True, "comments": "Looks good"},
... )
async execute_step(step, context, previous_result=None)[source]

Execute a single step within a workflow.

Parameters:
  • step (Step[Any]) – The step to execute.

  • context (WorkflowContext) – The current workflow context.

  • previous_result (Any) – Optional result from a previous step.

Return type:

Any

Returns:

The result of the step execution.

Raises:

Exception – If step execution fails and error handling doesn’t compensate.

async schedule_step(instance_id, step_name, delay=None)[source]

Schedule a step for later execution.

Parameters:
  • instance_id (UUID) – The workflow instance identifier.

  • step_name (str) – Name of the step to schedule.

  • delay (timedelta | None) – Optional delay before execution. If None, executes immediately.

Return type:

None

Example

>>> from datetime import timedelta
>>> await engine.schedule_step(
...     instance_id=instance.id, step_name="reminder", delay=timedelta(hours=24)
... )
async start_workflow(workflow, initial_data=None)[source]

Start a new workflow instance.

Parameters:
  • workflow (type[Workflow]) – The workflow class to instantiate and execute.

  • initial_data (dict[str, Any] | None) – Optional initial data to populate the workflow context.

Return type:

WorkflowInstance

Returns:

A WorkflowInstance representing the started workflow.

Example

>>> instance = await engine.start_workflow(
...     ApprovalWorkflow, initial_data={"document_id": "doc_123"}
... )
class litestar_workflows.engine.LocalExecutionEngine[source]

Bases: object

In-memory async execution engine for workflows.

This engine executes workflows in the same process using asyncio tasks. It’s suitable for development, testing, and single-instance production deployments where distributed execution is not required.

registry

The workflow registry for looking up definitions.

persistence

Optional persistence layer for saving state.

event_bus

Optional event bus for emitting workflow events.

_instances

In-memory storage of workflow instances.

_running

Map of instance IDs to their running asyncio tasks.

Parameters:
__init__(registry, persistence=None, event_bus=None)[source]

Initialize the local execution engine.

Parameters:
  • registry (WorkflowRegistry) – The workflow registry.

  • persistence (Any | None) – Optional persistence layer implementing save/load methods.

  • event_bus (Any | None) – Optional event bus implementing emit method.

async cancel_workflow(instance_id, reason)[source]

Cancel a running workflow.

Parameters:
  • instance_id (UUID) – The workflow instance ID.

  • reason (str) – Reason for cancellation.

Return type:

None

async complete_human_task(instance_id, step_name, user_id, data)[source]

Complete a human task with user-provided data.

Parameters:
  • instance_id (UUID) – The workflow instance ID.

  • step_name (str) – Name of the human task step.

  • user_id (str) – ID of the user completing the task.

  • data (dict[str, Any]) – User-provided data to merge into context.

Return type:

None

async execute_step(step, context, previous_result=None)[source]

Execute a single step with the given context.

Parameters:
  • step (Step[Any]) – The step to execute.

  • context (WorkflowContext) – The workflow context.

  • previous_result (Any) – Optional result from previous step.

Return type:

Any

Returns:

The result of the step execution.

get_all_instances()[source]

Get all workflow instances (running and completed).

Return type:

list[WorkflowInstanceData]

Returns:

List of all WorkflowInstanceData objects.

async get_instance(instance_id)[source]

Retrieve a workflow instance by ID.

Parameters:

instance_id (UUID) – The workflow instance ID.

Return type:

WorkflowInstanceData

Returns:

The WorkflowInstanceData.

Raises:

KeyError – If the instance is not found.

get_running_instances()[source]

Get all currently running workflow instances.

Return type:

list[WorkflowInstanceData]

Returns:

List of running WorkflowInstanceData objects.

async schedule_step(instance_id, step_name, delay=None)[source]

Schedule a step for execution.

Parameters:
  • instance_id (UUID) – The workflow instance ID.

  • step_name (str) – Name of the step to schedule.

  • delay (timedelta | None) – Optional delay before execution.

Return type:

None

async start_workflow(workflow, initial_data=None)[source]

Start a new workflow instance.

Creates a new workflow instance and begins execution from the initial step.

Parameters:
  • workflow (type[Workflow]) – The workflow class to execute.

  • initial_data (dict[str, Any] | None) – Optional initial data for the workflow context.

Return type:

WorkflowInstanceData

Returns:

The created WorkflowInstanceData.

Example

>>> engine = LocalExecutionEngine(registry)
>>> instance = await engine.start_workflow(
...     ApprovalWorkflow, initial_data={"document_id": "doc_123"}
... )
class litestar_workflows.engine.WorkflowGraph[source]

Bases: object

Graph representation of a workflow for navigation and validation.

The graph provides methods to navigate between steps, validate structure, and determine execution paths based on workflow state.

definition

The workflow definition this graph represents.

_adjacency

Adjacency list mapping step names to outgoing edges.

_reverse_adjacency

Reverse adjacency list for finding predecessors.

Parameters:

definition (WorkflowDefinition)

__init__(definition)[source]

Initialize a workflow graph from a definition.

Parameters:

definition (WorkflowDefinition) – The workflow definition to represent as a graph.

classmethod from_definition(definition)[source]

Create a workflow graph from a definition.

Parameters:

definition (WorkflowDefinition) – The workflow definition.

Return type:

WorkflowGraph

Returns:

A WorkflowGraph instance.

Example

>>> graph = WorkflowGraph.from_definition(my_definition)
get_all_paths(start, end, max_paths=100)[source]

Find all paths from start step to end step.

Parameters:
  • start (str) – Starting step name.

  • end (str) – Ending step name.

  • max_paths (int) – Maximum number of paths to find (prevents infinite loops).

Return type:

list[list[str]]

Returns:

List of paths, where each path is a list of step names.

Example

>>> paths = graph.get_all_paths("start", "end")
>>> for path in paths:
...     print(" -> ".join(path))
get_next_steps(current, context)[source]

Get the next steps from the current step.

Evaluates edge conditions to determine which steps should execute next. If multiple unconditional edges exist, all targets are returned (parallel).

Parameters:
  • current (str) – Name of the current step.

  • context (WorkflowContext) – The workflow context for condition evaluation.

Return type:

list[str]

Returns:

List of next step names. Empty list if current step is terminal.

Example

>>> next_steps = graph.get_next_steps("approval", context)
>>> if len(next_steps) > 1:
...     print("Parallel execution required")
get_previous_steps(current)[source]

Get the steps that lead to the current step.

Parameters:

current (str) – Name of the current step.

Return type:

list[str]

Returns:

List of predecessor step names.

Example

>>> predecessors = graph.get_previous_steps("final_step")
get_step_depth(step_name)[source]

Get the minimum depth from initial step to the given step.

Parameters:

step_name (str) – Name of the step.

Return type:

int

Returns:

Minimum number of steps from initial to this step. Returns -1 if unreachable.

Example

>>> depth = graph.get_step_depth("final_approval")
>>> print(f"Step is at depth {depth}")
is_terminal(step_name)[source]

Check if a step is a terminal (end) step.

Parameters:

step_name (str) – Name of the step to check.

Return type:

bool

Returns:

True if the step has no outgoing edges or is in terminal_steps.

Example

>>> if graph.is_terminal("approval"):
...     print("Workflow ends here")
validate()[source]

Validate the workflow graph structure.

Checks for common graph issues: - Unreachable steps (not connected to initial step) - Disconnected components - Steps with no outgoing edges that aren’t marked terminal - Invalid edge references

Return type:

list[str]

Returns:

List of validation error messages. Empty list if valid.

Example

>>> errors = graph.validate()
>>> if errors:
...     for error in errors:
...         print(f"Validation error: {error}")
litestar_workflows.engine.WorkflowInstance

alias of WorkflowInstanceData

class litestar_workflows.engine.WorkflowRegistry[source]

Bases: object

Registry for storing and retrieving workflow definitions.

The registry maintains a mapping of workflow names to versions and their definitions, enabling workflow lookup and version management.

_definitions

Nested dict mapping name -> version -> WorkflowDefinition.

_workflow_classes

Map of workflow names to their class definitions.

__init__()[source]

Initialize an empty workflow registry.

get_definition(name, version=None)[source]

Retrieve a workflow definition by name and optional version.

Parameters:
  • name (str) – The workflow name.

  • version (str | None) – The workflow version. If None, returns the latest version.

Return type:

WorkflowDefinition

Returns:

The WorkflowDefinition for the requested workflow.

Raises:

KeyError – If the workflow name or version is not found.

Example

>>> definition = registry.get_definition("approval_workflow")
>>> definition_v1 = registry.get_definition("approval_workflow", "1.0.0")
get_versions(name)[source]

Get all versions for a workflow.

Parameters:

name (str) – The workflow name.

Return type:

list[str]

Returns:

List of version strings.

Raises:

KeyError – If the workflow name is not found.

Example

>>> versions = registry.get_versions("approval_workflow")
>>> print(versions)  # ['1.0.0', '1.1.0', '2.0.0']
get_workflow_class(name)[source]

Retrieve the workflow class by name.

Parameters:

name (str) – The workflow name.

Return type:

type[Workflow]

Returns:

The Workflow class.

Raises:

KeyError – If the workflow name is not found.

Example

>>> WorkflowClass = registry.get_workflow_class("approval_workflow")
>>> instance = await engine.start_workflow(WorkflowClass)
has_workflow(name, version=None)[source]

Check if a workflow exists in the registry.

Parameters:
  • name (str) – The workflow name.

  • version (str | None) – Optional specific version to check.

Return type:

bool

Returns:

True if the workflow exists, False otherwise.

Example

>>> if registry.has_workflow("approval_workflow"):
...     definition = registry.get_definition("approval_workflow")
list_definitions(active_only=True)[source]

List all registered workflow definitions.

Parameters:

active_only (bool) – If True, only return the latest version of each workflow. If False, return all versions.

Return type:

list[WorkflowDefinition]

Returns:

List of WorkflowDefinition objects.

Example

>>> all_workflows = registry.list_definitions()
>>> all_versions = registry.list_definitions(active_only=False)
register(workflow_class)[source]

Register a workflow class with the registry.

Extracts the workflow definition from the class and stores it indexed by name and version.

Parameters:

workflow_class (type[Workflow]) – The workflow class to register.

Return type:

None

Example

>>> registry = WorkflowRegistry()
>>> registry.register(MyWorkflow)
unregister(name, version=None)[source]

Remove a workflow from the registry.

Parameters:
  • name (str) – The workflow name.

  • version (str | None) – The specific version to remove. If None, removes all versions.

Return type:

None

Example

>>> registry.unregister("old_workflow")
>>> registry.unregister("approval_workflow", "1.0.0")

Local in-memory async execution engine.

This module provides a local, in-process execution engine suitable for development, testing, and single-instance deployments.

class litestar_workflows.engine.local.LocalExecutionEngine[source]

Bases: object

In-memory async execution engine for workflows.

This engine executes workflows in the same process using asyncio tasks. It’s suitable for development, testing, and single-instance production deployments where distributed execution is not required.

registry

The workflow registry for looking up definitions.

persistence

Optional persistence layer for saving state.

event_bus

Optional event bus for emitting workflow events.

_instances

In-memory storage of workflow instances.

_running

Map of instance IDs to their running asyncio tasks.

Parameters:
__init__(registry, persistence=None, event_bus=None)[source]

Initialize the local execution engine.

Parameters:
  • registry (WorkflowRegistry) – The workflow registry.

  • persistence (Any | None) – Optional persistence layer implementing save/load methods.

  • event_bus (Any | None) – Optional event bus implementing emit method.

async start_workflow(workflow, initial_data=None)[source]

Start a new workflow instance.

Creates a new workflow instance and begins execution from the initial step.

Parameters:
  • workflow (type[Workflow]) – The workflow class to execute.

  • initial_data (dict[str, Any] | None) – Optional initial data for the workflow context.

Return type:

WorkflowInstanceData

Returns:

The created WorkflowInstanceData.

Example

>>> engine = LocalExecutionEngine(registry)
>>> instance = await engine.start_workflow(
...     ApprovalWorkflow, initial_data={"document_id": "doc_123"}
... )
async execute_step(step, context, previous_result=None)[source]

Execute a single step with the given context.

Parameters:
  • step (Step[Any]) – The step to execute.

  • context (WorkflowContext) – The workflow context.

  • previous_result (Any) – Optional result from previous step.

Return type:

Any

Returns:

The result of the step execution.

async schedule_step(instance_id, step_name, delay=None)[source]

Schedule a step for execution.

Parameters:
  • instance_id (UUID) – The workflow instance ID.

  • step_name (str) – Name of the step to schedule.

  • delay (timedelta | None) – Optional delay before execution.

Return type:

None

async complete_human_task(instance_id, step_name, user_id, data)[source]

Complete a human task with user-provided data.

Parameters:
  • instance_id (UUID) – The workflow instance ID.

  • step_name (str) – Name of the human task step.

  • user_id (str) – ID of the user completing the task.

  • data (dict[str, Any]) – User-provided data to merge into context.

Return type:

None

async cancel_workflow(instance_id, reason)[source]

Cancel a running workflow.

Parameters:
  • instance_id (UUID) – The workflow instance ID.

  • reason (str) – Reason for cancellation.

Return type:

None

async get_instance(instance_id)[source]

Retrieve a workflow instance by ID.

Parameters:

instance_id (UUID) – The workflow instance ID.

Return type:

WorkflowInstanceData

Returns:

The WorkflowInstanceData.

Raises:

KeyError – If the instance is not found.

get_running_instances()[source]

Get all currently running workflow instances.

Return type:

list[WorkflowInstanceData]

Returns:

List of running WorkflowInstanceData objects.

get_all_instances()[source]

Get all workflow instances (running and completed).

Return type:

list[WorkflowInstanceData]

Returns:

List of all WorkflowInstanceData objects.

Workflow registry for managing workflow definitions.

This module provides a registry for storing, retrieving, and managing workflow definitions with support for versioning.

class litestar_workflows.engine.registry.WorkflowRegistry[source]

Bases: object

Registry for storing and retrieving workflow definitions.

The registry maintains a mapping of workflow names to versions and their definitions, enabling workflow lookup and version management.

_definitions

Nested dict mapping name -> version -> WorkflowDefinition.

_workflow_classes

Map of workflow names to their class definitions.

__init__()[source]

Initialize an empty workflow registry.

register(workflow_class)[source]

Register a workflow class with the registry.

Extracts the workflow definition from the class and stores it indexed by name and version.

Parameters:

workflow_class (type[Workflow]) – The workflow class to register.

Return type:

None

Example

>>> registry = WorkflowRegistry()
>>> registry.register(MyWorkflow)
get_definition(name, version=None)[source]

Retrieve a workflow definition by name and optional version.

Parameters:
  • name (str) – The workflow name.

  • version (str | None) – The workflow version. If None, returns the latest version.

Return type:

WorkflowDefinition

Returns:

The WorkflowDefinition for the requested workflow.

Raises:

KeyError – If the workflow name or version is not found.

Example

>>> definition = registry.get_definition("approval_workflow")
>>> definition_v1 = registry.get_definition("approval_workflow", "1.0.0")
get_workflow_class(name)[source]

Retrieve the workflow class by name.

Parameters:

name (str) – The workflow name.

Return type:

type[Workflow]

Returns:

The Workflow class.

Raises:

KeyError – If the workflow name is not found.

Example

>>> WorkflowClass = registry.get_workflow_class("approval_workflow")
>>> instance = await engine.start_workflow(WorkflowClass)
list_definitions(active_only=True)[source]

List all registered workflow definitions.

Parameters:

active_only (bool) – If True, only return the latest version of each workflow. If False, return all versions.

Return type:

list[WorkflowDefinition]

Returns:

List of WorkflowDefinition objects.

Example

>>> all_workflows = registry.list_definitions()
>>> all_versions = registry.list_definitions(active_only=False)
unregister(name, version=None)[source]

Remove a workflow from the registry.

Parameters:
  • name (str) – The workflow name.

  • version (str | None) – The specific version to remove. If None, removes all versions.

Return type:

None

Example

>>> registry.unregister("old_workflow")
>>> registry.unregister("approval_workflow", "1.0.0")
has_workflow(name, version=None)[source]

Check if a workflow exists in the registry.

Parameters:
  • name (str) – The workflow name.

  • version (str | None) – Optional specific version to check.

Return type:

bool

Returns:

True if the workflow exists, False otherwise.

Example

>>> if registry.has_workflow("approval_workflow"):
...     definition = registry.get_definition("approval_workflow")
get_versions(name)[source]

Get all versions for a workflow.

Parameters:

name (str) – The workflow name.

Return type:

list[str]

Returns:

List of version strings.

Raises:

KeyError – If the workflow name is not found.

Example

>>> versions = registry.get_versions("approval_workflow")
>>> print(versions)  # ['1.0.0', '1.1.0', '2.0.0']

Workflow graph operations and navigation.

This module provides graph-based operations for workflow definitions, including step navigation, validation, and path finding.

class litestar_workflows.engine.graph.WorkflowGraph[source]

Bases: object

Graph representation of a workflow for navigation and validation.

The graph provides methods to navigate between steps, validate structure, and determine execution paths based on workflow state.

definition

The workflow definition this graph represents.

_adjacency

Adjacency list mapping step names to outgoing edges.

_reverse_adjacency

Reverse adjacency list for finding predecessors.

Parameters:

definition (WorkflowDefinition)

__init__(definition)[source]

Initialize a workflow graph from a definition.

Parameters:

definition (WorkflowDefinition) – The workflow definition to represent as a graph.

classmethod from_definition(definition)[source]

Create a workflow graph from a definition.

Parameters:

definition (WorkflowDefinition) – The workflow definition.

Return type:

WorkflowGraph

Returns:

A WorkflowGraph instance.

Example

>>> graph = WorkflowGraph.from_definition(my_definition)
get_next_steps(current, context)[source]

Get the next steps from the current step.

Evaluates edge conditions to determine which steps should execute next. If multiple unconditional edges exist, all targets are returned (parallel).

Parameters:
  • current (str) – Name of the current step.

  • context (WorkflowContext) – The workflow context for condition evaluation.

Return type:

list[str]

Returns:

List of next step names. Empty list if current step is terminal.

Example

>>> next_steps = graph.get_next_steps("approval", context)
>>> if len(next_steps) > 1:
...     print("Parallel execution required")
get_previous_steps(current)[source]

Get the steps that lead to the current step.

Parameters:

current (str) – Name of the current step.

Return type:

list[str]

Returns:

List of predecessor step names.

Example

>>> predecessors = graph.get_previous_steps("final_step")
is_terminal(step_name)[source]

Check if a step is a terminal (end) step.

Parameters:

step_name (str) – Name of the step to check.

Return type:

bool

Returns:

True if the step has no outgoing edges or is in terminal_steps.

Example

>>> if graph.is_terminal("approval"):
...     print("Workflow ends here")
validate()[source]

Validate the workflow graph structure.

Checks for common graph issues: - Unreachable steps (not connected to initial step) - Disconnected components - Steps with no outgoing edges that aren’t marked terminal - Invalid edge references

Return type:

list[str]

Returns:

List of validation error messages. Empty list if valid.

Example

>>> errors = graph.validate()
>>> if errors:
...     for error in errors:
...         print(f"Validation error: {error}")
get_all_paths(start, end, max_paths=100)[source]

Find all paths from start step to end step.

Parameters:
  • start (str) – Starting step name.

  • end (str) – Ending step name.

  • max_paths (int) – Maximum number of paths to find (prevents infinite loops).

Return type:

list[list[str]]

Returns:

List of paths, where each path is a list of step names.

Example

>>> paths = graph.get_all_paths("start", "end")
>>> for path in paths:
...     print(" -> ".join(path))
get_step_depth(step_name)[source]

Get the minimum depth from initial step to the given step.

Parameters:

step_name (str) – Name of the step.

Return type:

int

Returns:

Minimum number of steps from initial to this step. Returns -1 if unreachable.

Example

>>> depth = graph.get_step_depth("final_approval")
>>> print(f"Step is at depth {depth}")

Steps Module

Built-in step implementations.

Built-in step implementations for litestar-workflows.

class litestar_workflows.steps.BaseHumanStep[source]

Bases: BaseStep

Base for human approval/interaction steps.

Human steps pause workflow execution and wait for user input. They support forms, assignments, and deadline tracking.

Parameters:
__init__(name, title, description='', form_schema=None, assignee_key=None)[source]

Initialize the human step.

Parameters:
  • name (str) – Unique identifier for the step.

  • title (str) – Display title for the human task.

  • description (str) – Human-readable description.

  • form_schema (dict[str, Any] | None) – Optional JSON Schema for the task form.

  • assignee_key (str | None) – Optional context key to get assignee dynamically.

assignee_key: str | None = None

Context key for dynamic assignment of tasks.

async execute(context)[source]

Execute the human step.

For human steps, execution typically means waiting for user input. Override this if you need custom behavior.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

Any

Returns:

The form data submitted by the user.

form_schema: dict[str, Any] | None = None

JSON Schema defining the form structure for user input.

async get_assignee(context)[source]

Get the assignee for this task from context.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

str | None

Returns:

User ID to assign the task to, or None for unassigned.

step_type: StepType = 'human'

Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).

title: str

Display title for the human task.

name: str

Unique identifier for the step.

class litestar_workflows.steps.BaseMachineStep[source]

Bases: BaseStep

Base for automated machine steps.

Machine steps execute automatically without requiring human interaction. They are the building blocks for automated workflow processes.

Parameters:
  • name (str)

  • description (str)

__init__(name, description='')[source]

Initialize the machine step.

Parameters:
  • name (str) – Unique identifier for the step.

  • description (str) – Human-readable description.

step_type: StepType = 'machine'

Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).

name: str

Unique identifier for the step.

class litestar_workflows.steps.BaseStep[source]

Bases: object

Base implementation with common functionality for all steps.

This class provides default implementations of the Step protocol methods and common attributes. Subclass this to create custom step types.

Parameters:
  • name (str)

  • description (str)

__init__(name, description='')[source]

Initialize the base step.

Parameters:
  • name (str) – Unique identifier for the step.

  • description (str) – Human-readable description.

async can_execute(context)[source]

Check if step can execute given the current context.

Override this method to implement guard logic.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

bool

Returns:

True if the step can execute, False to skip.

description: str = ''

Human-readable description of what the step does.

async execute(context)[source]

Execute the step with the given context.

Override this method to implement step logic.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

Any

Returns:

The result of the step execution.

Raises:

NotImplementedError – Must be implemented by subclasses.

async on_failure(context, error)[source]

Hook called after failed step execution.

Override this method to implement error handling logic.

Parameters:
  • context (WorkflowContext) – The workflow execution context.

  • error (Exception) – The exception that caused the failure.

Return type:

None

async on_success(context, result)[source]

Hook called after successful step execution.

Override this method to implement post-success logic.

Parameters:
  • context (WorkflowContext) – The workflow execution context.

  • result (Any) – The result returned by execute().

Return type:

None

step_type: StepType = 'machine'

Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).

name: str

Unique identifier for the step.

class litestar_workflows.steps.ConditionalGroup[source]

Bases: StepGroup

Execute one of multiple branches based on condition.

This implements the Gateway pattern where a condition function determines which branch to execute. Similar to if/else or switch statements.

Example

>>> def check_status(ctx: WorkflowContext) -> str:
...     return "approved" if ctx.get("approved") else "rejected"
>>> group = ConditionalGroup(
...     condition=check_status,
...     branches={
...         "approved": approve_step,
...         "rejected": reject_step,
...     },
... )
>>> result = await group.execute(context, engine)
Parameters:
__init__(condition, branches)[source]

Initialize a conditional group.

Parameters:
async execute(context, engine)[source]

Execute the branch selected by the condition.

Parameters:
Return type:

Any

Returns:

The result of the selected branch, or None if no match.

Raises:

Exception – Any exception from step execution.

class litestar_workflows.steps.ExclusiveGateway[source]

Bases: BaseStep

XOR gateway - exactly one path based on condition.

This gateway evaluates a condition function and returns the name of the next step to execute. Only one path will be followed.

Example

>>> def check_approval(ctx: WorkflowContext) -> str:
...     return "approved_step" if ctx.get("approved") else "rejected_step"
>>> gateway = ExclusiveGateway("approval_gate", condition=check_approval)
>>> next_step = await gateway.execute(context)  # Returns step name
Parameters:
__init__(name, condition, description='')[source]

Initialize an exclusive gateway.

Parameters:
  • name (str) – Unique identifier for the gateway.

  • condition (Callable[[WorkflowContext], str]) – Function that evaluates context and returns next step name.

  • description (str) – Human-readable description.

async execute(context)[source]

Evaluate condition and return the name of the next step.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

str

Returns:

The name of the next step to execute.

Raises:

Exception – If condition evaluation fails.

step_type: StepType = 'gateway'

Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).

name: str

Unique identifier for the step.

class litestar_workflows.steps.ParallelGateway[source]

Bases: BaseStep

AND gateway - all paths execute in parallel.

This gateway splits execution into multiple parallel branches. All branches will be executed concurrently.

Example

>>> gateway = ParallelGateway(
...     "fork_point", branches=["notify_team", "update_db", "send_email"]
... )
>>> branch_names = await gateway.execute(context)
Parameters:
__init__(name, branches, description='')[source]

Initialize a parallel gateway.

Parameters:
  • name (str) – Unique identifier for the gateway.

  • branches (list[str]) – List of step names to execute in parallel.

  • description (str) – Human-readable description.

async execute(context)[source]

Return the list of branches to execute in parallel.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

list[str]

Returns:

List of step names to execute concurrently.

step_type: StepType = 'gateway'

Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).

name: str

Unique identifier for the step.

class litestar_workflows.steps.ParallelGroup[source]

Bases: StepGroup

Execute steps in parallel.

This implements the Group pattern where multiple steps execute concurrently. Optionally supports a callback step (Chord pattern) that receives all results.

Example

>>> # Simple parallel execution
>>> group = ParallelGroup(step1, step2, step3)
>>> results = await group.execute(context, engine)  # [result1, result2, result3]
>>> # Chord pattern with callback
>>> group = ParallelGroup(step1, step2, step3, callback=aggregate_step)
>>> result = await group.execute(context, engine)  # aggregate_step([r1, r2, r3])
Parameters:
__init__(*steps, callback=None)[source]

Initialize a parallel group.

Parameters:
  • *steps (Union[Step[Any], StepGroup]) – Steps or groups to execute in parallel.

  • callback (Optional[Step[Any]]) – Optional callback step to process results (Chord pattern).

async execute(context, engine)[source]

Execute steps in parallel using asyncio.gather.

Parameters:
Return type:

list[Any] | Any

Returns:

List of results if no callback, otherwise callback result.

Raises:

Exception – Any exception from step execution.

class litestar_workflows.steps.SequentialGroup[source]

Bases: StepGroup

Execute steps in sequence, passing results.

This implements the Chain pattern where each step receives the result of the previous step as input. The final step’s result is returned.

Example

>>> group = SequentialGroup(step1, step2, step3)
>>> result = await group.execute(context, engine)
# step1 -> step2(result1) -> step3(result2) -> result3
Parameters:

steps (Union[Step[Any], StepGroup])

__init__(*steps)[source]

Initialize a sequential group.

Parameters:

*steps (Union[Step[Any], StepGroup]) – Steps or groups to execute in sequence.

async execute(context, engine)[source]

Execute steps sequentially, passing results forward.

Parameters:
Return type:

Any

Returns:

The result of the final step.

Raises:

Exception – Any exception from step execution.

class litestar_workflows.steps.StepGroup[source]

Bases: ABC

Base for composable step patterns.

Step groups allow you to compose multiple steps into reusable patterns like sequences, parallel execution, and conditional branching.

abstractmethod async execute(context, engine)[source]

Execute the step group.

Parameters:
Return type:

Any

Returns:

The result of the group execution.

Raises:

Exception – Any exception during group execution.

class litestar_workflows.steps.TimerStep[source]

Bases: BaseStep

Step that waits for a duration before continuing.

Timer steps introduce delays in workflow execution. The duration can be static or dynamically calculated based on the workflow context.

Example

>>> # Static delay
>>> step = TimerStep("wait_5min", duration=timedelta(minutes=5))
>>> await step.execute(context)
>>> # Dynamic delay based on context
>>> def get_delay(ctx: WorkflowContext) -> timedelta:
...     priority = ctx.get("priority", "normal")
...     return timedelta(hours=1) if priority == "low" else timedelta(minutes=5)
>>> step = TimerStep("dynamic_wait", duration=get_delay)
>>> await step.execute(context)
Parameters:
__init__(name, duration, description='')[source]

Initialize a timer step.

Parameters:
async execute(context)[source]

Wait for the specified duration.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

None

Returns:

None after the delay completes.

get_duration(context)[source]

Get the delay duration for this step.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

timedelta

Returns:

The duration to wait.

step_type: StepType = 'timer'

Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).

name: str

Unique identifier for the step.

class litestar_workflows.steps.WebhookStep[source]

Bases: BaseStep

Step that waits for external webhook callback.

Webhook steps pause workflow execution until an external system sends a callback with data. This is useful for integrating with third-party services or async external processes.

The execution engine is responsible for managing the actual waiting and resuming the workflow when the webhook is received.

Example

>>> # Wait for payment confirmation
>>> step = WebhookStep(
...     "wait_payment",
...     callback_key="payment_data",
...     description="Wait for payment gateway callback",
... )
>>> # Workflow pauses here until webhook received
>>> payment_data = await step.execute(context)
>>> # Continue with payment_data from webhook
Parameters:
  • name (str)

  • callback_key (str)

  • description (str)

__init__(name, callback_key='webhook_data', description='')[source]

Initialize a webhook step.

Parameters:
  • name (str) – Unique identifier for the step.

  • callback_key (str) – Context key where webhook data will be stored.

  • description (str) – Human-readable description.

async execute(context)[source]

Retrieve webhook data from context.

This step pauses execution until a webhook is received. The execution engine handles the actual waiting mechanism.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

Any

Returns:

The data received from the webhook callback.

Note

The engine will populate the callback_key in context when the webhook is received before resuming execution.

step_type: StepType = 'webhook'

Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).

Base step implementations for litestar-workflows.

class litestar_workflows.steps.base.BaseStep[source]

Bases: object

Base implementation with common functionality for all steps.

This class provides default implementations of the Step protocol methods and common attributes. Subclass this to create custom step types.

Parameters:
  • name (str)

  • description (str)

step_type: StepType = 'machine'

Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).

__init__(name, description='')[source]

Initialize the base step.

Parameters:
  • name (str) – Unique identifier for the step.

  • description (str) – Human-readable description.

name: str

Unique identifier for the step.

description: str = ''

Human-readable description of what the step does.

async execute(context)[source]

Execute the step with the given context.

Override this method to implement step logic.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

Any

Returns:

The result of the step execution.

Raises:

NotImplementedError – Must be implemented by subclasses.

async can_execute(context)[source]

Check if step can execute given the current context.

Override this method to implement guard logic.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

bool

Returns:

True if the step can execute, False to skip.

async on_success(context, result)[source]

Hook called after successful step execution.

Override this method to implement post-success logic.

Parameters:
  • context (WorkflowContext) – The workflow execution context.

  • result (Any) – The result returned by execute().

Return type:

None

async on_failure(context, error)[source]

Hook called after failed step execution.

Override this method to implement error handling logic.

Parameters:
  • context (WorkflowContext) – The workflow execution context.

  • error (Exception) – The exception that caused the failure.

Return type:

None

class litestar_workflows.steps.base.BaseMachineStep[source]

Bases: BaseStep

Base for automated machine steps.

Machine steps execute automatically without requiring human interaction. They are the building blocks for automated workflow processes.

Parameters:
  • name (str)

  • description (str)

__init__(name, description='')[source]

Initialize the machine step.

Parameters:
  • name (str) – Unique identifier for the step.

  • description (str) – Human-readable description.

step_type: StepType = 'machine'

Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).

name: str

Unique identifier for the step.

class litestar_workflows.steps.base.BaseHumanStep[source]

Bases: BaseStep

Base for human approval/interaction steps.

Human steps pause workflow execution and wait for user input. They support forms, assignments, and deadline tracking.

Parameters:
name: str

Unique identifier for the step.

__init__(name, title, description='', form_schema=None, assignee_key=None)[source]

Initialize the human step.

Parameters:
  • name (str) – Unique identifier for the step.

  • title (str) – Display title for the human task.

  • description (str) – Human-readable description.

  • form_schema (dict[str, Any] | None) – Optional JSON Schema for the task form.

  • assignee_key (str | None) – Optional context key to get assignee dynamically.

step_type: StepType = 'human'

Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).

title: str

Display title for the human task.

form_schema: dict[str, Any] | None = None

JSON Schema defining the form structure for user input.

assignee_key: str | None = None

Context key for dynamic assignment of tasks.

async get_assignee(context)[source]

Get the assignee for this task from context.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

str | None

Returns:

User ID to assign the task to, or None for unassigned.

async execute(context)[source]

Execute the human step.

For human steps, execution typically means waiting for user input. Override this if you need custom behavior.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

Any

Returns:

The form data submitted by the user.

Composable step groups for litestar-workflows.

class litestar_workflows.steps.groups.StepGroup[source]

Bases: ABC

Base for composable step patterns.

Step groups allow you to compose multiple steps into reusable patterns like sequences, parallel execution, and conditional branching.

abstractmethod async execute(context, engine)[source]

Execute the step group.

Parameters:
Return type:

Any

Returns:

The result of the group execution.

Raises:

Exception – Any exception during group execution.

class litestar_workflows.steps.groups.SequentialGroup[source]

Bases: StepGroup

Execute steps in sequence, passing results.

This implements the Chain pattern where each step receives the result of the previous step as input. The final step’s result is returned.

Example

>>> group = SequentialGroup(step1, step2, step3)
>>> result = await group.execute(context, engine)
# step1 -> step2(result1) -> step3(result2) -> result3
Parameters:

steps (Union[Step[Any], StepGroup])

__init__(*steps)[source]

Initialize a sequential group.

Parameters:

*steps (Union[Step[Any], StepGroup]) – Steps or groups to execute in sequence.

async execute(context, engine)[source]

Execute steps sequentially, passing results forward.

Parameters:
Return type:

Any

Returns:

The result of the final step.

Raises:

Exception – Any exception from step execution.

class litestar_workflows.steps.groups.ParallelGroup[source]

Bases: StepGroup

Execute steps in parallel.

This implements the Group pattern where multiple steps execute concurrently. Optionally supports a callback step (Chord pattern) that receives all results.

Example

>>> # Simple parallel execution
>>> group = ParallelGroup(step1, step2, step3)
>>> results = await group.execute(context, engine)  # [result1, result2, result3]
>>> # Chord pattern with callback
>>> group = ParallelGroup(step1, step2, step3, callback=aggregate_step)
>>> result = await group.execute(context, engine)  # aggregate_step([r1, r2, r3])
Parameters:
__init__(*steps, callback=None)[source]

Initialize a parallel group.

Parameters:
  • *steps (Union[Step[Any], StepGroup]) – Steps or groups to execute in parallel.

  • callback (Optional[Step[Any]]) – Optional callback step to process results (Chord pattern).

async execute(context, engine)[source]

Execute steps in parallel using asyncio.gather.

Parameters:
Return type:

list[Any] | Any

Returns:

List of results if no callback, otherwise callback result.

Raises:

Exception – Any exception from step execution.

class litestar_workflows.steps.groups.ConditionalGroup[source]

Bases: StepGroup

Execute one of multiple branches based on condition.

This implements the Gateway pattern where a condition function determines which branch to execute. Similar to if/else or switch statements.

Example

>>> def check_status(ctx: WorkflowContext) -> str:
...     return "approved" if ctx.get("approved") else "rejected"
>>> group = ConditionalGroup(
...     condition=check_status,
...     branches={
...         "approved": approve_step,
...         "rejected": reject_step,
...     },
... )
>>> result = await group.execute(context, engine)
Parameters:
__init__(condition, branches)[source]

Initialize a conditional group.

Parameters:
async execute(context, engine)[source]

Execute the branch selected by the condition.

Parameters:
Return type:

Any

Returns:

The result of the selected branch, or None if no match.

Raises:

Exception – Any exception from step execution.

Decision gateway steps for workflow branching.

class litestar_workflows.steps.gateway.ExclusiveGateway[source]

Bases: BaseStep

XOR gateway - exactly one path based on condition.

This gateway evaluates a condition function and returns the name of the next step to execute. Only one path will be followed.

Example

>>> def check_approval(ctx: WorkflowContext) -> str:
...     return "approved_step" if ctx.get("approved") else "rejected_step"
>>> gateway = ExclusiveGateway("approval_gate", condition=check_approval)
>>> next_step = await gateway.execute(context)  # Returns step name
Parameters:
__init__(name, condition, description='')[source]

Initialize an exclusive gateway.

Parameters:
  • name (str) – Unique identifier for the gateway.

  • condition (Callable[[WorkflowContext], str]) – Function that evaluates context and returns next step name.

  • description (str) – Human-readable description.

step_type: StepType = 'gateway'

Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).

async execute(context)[source]

Evaluate condition and return the name of the next step.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

str

Returns:

The name of the next step to execute.

Raises:

Exception – If condition evaluation fails.

name: str

Unique identifier for the step.

class litestar_workflows.steps.gateway.ParallelGateway[source]

Bases: BaseStep

AND gateway - all paths execute in parallel.

This gateway splits execution into multiple parallel branches. All branches will be executed concurrently.

Example

>>> gateway = ParallelGateway(
...     "fork_point", branches=["notify_team", "update_db", "send_email"]
... )
>>> branch_names = await gateway.execute(context)
Parameters:
__init__(name, branches, description='')[source]

Initialize a parallel gateway.

Parameters:
  • name (str) – Unique identifier for the gateway.

  • branches (list[str]) – List of step names to execute in parallel.

  • description (str) – Human-readable description.

step_type: StepType = 'gateway'

Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).

name: str

Unique identifier for the step.

async execute(context)[source]

Return the list of branches to execute in parallel.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

list[str]

Returns:

List of step names to execute concurrently.

Timer and delay steps for workflow scheduling.

class litestar_workflows.steps.timer.TimerStep[source]

Bases: BaseStep

Step that waits for a duration before continuing.

Timer steps introduce delays in workflow execution. The duration can be static or dynamically calculated based on the workflow context.

Example

>>> # Static delay
>>> step = TimerStep("wait_5min", duration=timedelta(minutes=5))
>>> await step.execute(context)
>>> # Dynamic delay based on context
>>> def get_delay(ctx: WorkflowContext) -> timedelta:
...     priority = ctx.get("priority", "normal")
...     return timedelta(hours=1) if priority == "low" else timedelta(minutes=5)
>>> step = TimerStep("dynamic_wait", duration=get_delay)
>>> await step.execute(context)
Parameters:
__init__(name, duration, description='')[source]

Initialize a timer step.

Parameters:
step_type: StepType = 'timer'

Type of step (MACHINE, HUMAN, WEBHOOK, TIMER, GATEWAY).

get_duration(context)[source]

Get the delay duration for this step.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

timedelta

Returns:

The duration to wait.

async execute(context)[source]

Wait for the specified duration.

Parameters:

context (WorkflowContext) – The workflow execution context.

Return type:

None

Returns:

None after the delay completes.

name: str

Unique identifier for the step.

Database Module

Available with the [db] extra: pip install litestar-workflows[db]

Database persistence layer for litestar-workflows.

This module provides SQLAlchemy models and repositories for persisting workflow definitions, instances, and execution history.

Requires the [db] extra:

pip install litestar-workflows[db]

class litestar_workflows.db.HumanTaskModel[source]

Bases: UUIDAuditBase

Pending human task for quick querying and assignment.

Provides a denormalized view of pending human approval tasks for efficient querying by assignee, due date, and status.

instance_id

Foreign key to the workflow instance.

step_execution_id

Foreign key to the step execution.

step_name

Name of the human task step.

title

Display title for the task.

description

Detailed description of the task.

form_schema

JSON Schema defining the task form.

assignee_id

User ID assigned to complete the task.

assignee_group

Group/role that can complete the task.

due_at

Deadline for task completion.

reminder_at

When to send a reminder.

status

Current task status (PENDING, COMPLETED, CANCELED).

completed_at

When the task was completed.

completed_by

User who completed the task.

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

assignee_group: Mapped[str | None]
assignee_id: Mapped[str | None]
completed_at: Mapped[datetime | None]
completed_by: Mapped[str | None]
created_at: Mapped[datetime.datetime]

Date/time of instance creation.

description: Mapped[str | None]
due_at: Mapped[datetime | None]
form_schema: Mapped[dict[str, Any] | None]
id: Mapped[UUID]

UUID Primary key column.

instance: Mapped[WorkflowInstanceModel]
instance_id: Mapped[UUID]
reminder_at: Mapped[datetime | None]
status: Mapped[str]
step_execution_id: Mapped[UUID]
step_name: Mapped[str]
title: Mapped[str]
updated_at: Mapped[datetime.datetime]

Date/time of instance last update.

class litestar_workflows.db.HumanTaskRepository[source]

Bases: SQLAlchemyAsyncRepository[HumanTaskModel]

Repository for human task CRUD operations.

Provides methods for querying pending human tasks by assignee, group, and due date.

Parameters:
async cancel_task(task_id)[source]

Cancel a pending human task.

Parameters:

task_id (UUID) – The task ID.

Return type:

HumanTaskModel | None

Returns:

The updated task or None if not found.

async complete_task(task_id, completed_by)[source]

Mark a human task as completed.

Parameters:
  • task_id (UUID) – The task ID.

  • completed_by (str) – User ID who completed the task.

Return type:

HumanTaskModel | None

Returns:

The updated task or None if not found.

async find_by_instance(instance_id)[source]

Find all human tasks for an instance.

Parameters:

instance_id (UUID) – The workflow instance ID.

Return type:

Sequence[HumanTaskModel]

Returns:

List of human tasks.

async find_overdue()[source]

Find overdue pending human tasks.

Return type:

Sequence[HumanTaskModel]

Returns:

List of overdue human tasks.

async find_pending(assignee_id=None, assignee_group=None)[source]

Find pending human tasks.

Parameters:
  • assignee_id (str | None) – Optional assignee ID filter.

  • assignee_group (str | None) – Optional group filter.

Return type:

Sequence[HumanTaskModel]

Returns:

List of pending human tasks.

model_type

alias of HumanTaskModel

class litestar_workflows.db.PersistentExecutionEngine[source]

Bases: object

Execution engine with database persistence.

This engine persists all workflow state to a database, enabling durability, recovery, and querying of workflow history.

registry

The workflow registry for looking up definitions.

session

SQLAlchemy async session for database operations.

event_bus

Optional event bus for emitting workflow events.

Parameters:
__init__(registry, session, event_bus=None)[source]

Initialize the persistent execution engine.

Parameters:
async cancel_workflow(instance_id, reason)[source]

Cancel a running workflow.

Parameters:
  • instance_id (UUID) – The workflow instance ID.

  • reason (str) – Reason for cancellation.

Return type:

None

async complete_human_task(instance_id, step_name, user_id, data)[source]

Complete a human task with user-provided data.

Parameters:
  • instance_id (UUID) – The workflow instance ID.

  • step_name (str) – Name of the human task step.

  • user_id (str) – ID of the user completing the task.

  • data (dict[str, Any]) – User-provided data to merge into context.

Return type:

None

async get_instance(instance_id)[source]

Get a workflow instance by ID.

Parameters:

instance_id (UUID) – The workflow instance ID.

Return type:

WorkflowInstanceData

Returns:

The WorkflowInstanceData.

Raises:

KeyError – If instance not found.

get_running_instances()[source]

Get IDs of currently running instances.

Return type:

list[UUID]

Returns:

List of running instance IDs.

async start_workflow(workflow, initial_data=None, *, tenant_id=None, created_by=None)[source]

Start a new workflow instance with persistence.

Parameters:
  • workflow (type[Workflow]) – The workflow class to execute.

  • initial_data (dict[str, Any] | None) – Optional initial data for the workflow.

  • tenant_id (str | None) – Optional tenant ID for multi-tenancy.

  • created_by (str | None) – Optional user ID who started the workflow.

Return type:

WorkflowInstanceData

Returns:

The created WorkflowInstanceData.

class litestar_workflows.db.StepExecutionModel[source]

Bases: UUIDAuditBase

Record of a single step execution within a workflow instance.

Tracks the execution of each step including timing, status, and input/output data for debugging and audit purposes.

instance_id

Foreign key to the workflow instance.

step_name

Name of the executed step.

step_type

Type of step (MACHINE, HUMAN, etc.).

status

Execution status of the step.

input_data

Input data passed to the step.

output_data

Output data produced by the step.

error

Error message if step failed.

started_at

Timestamp when step execution began.

completed_at

Timestamp when step execution finished.

assigned_to

User ID assigned to human tasks.

completed_by

User ID who completed human tasks.

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

assigned_to: Mapped[str | None]
completed_at: Mapped[datetime | None]
completed_by: Mapped[str | None]
created_at: Mapped[datetime.datetime]

Date/time of instance creation.

error: Mapped[str | None]
id: Mapped[UUID]

UUID Primary key column.

input_data: Mapped[dict[str, Any] | None]
instance: Mapped[WorkflowInstanceModel]
instance_id: Mapped[UUID]
output_data: Mapped[dict[str, Any] | None]
started_at: Mapped[datetime]
status: Mapped[StepStatus]
step_name: Mapped[str]
step_type: Mapped[StepType]
updated_at: Mapped[datetime.datetime]

Date/time of instance last update.

class litestar_workflows.db.StepExecutionRepository[source]

Bases: SQLAlchemyAsyncRepository[StepExecutionModel]

Repository for step execution record CRUD operations.

Parameters:
async find_by_instance(instance_id)[source]

Find all step executions for an instance.

Parameters:

instance_id (UUID) – The workflow instance ID.

Return type:

Sequence[StepExecutionModel]

Returns:

List of step executions ordered by start time.

async find_by_step_name(instance_id, step_name)[source]

Find the execution record for a specific step.

Parameters:
  • instance_id (UUID) – The workflow instance ID.

  • step_name (str) – The step name.

Return type:

StepExecutionModel | None

Returns:

The step execution or None.

async find_failed(instance_id=None)[source]

Find failed step executions.

Parameters:

instance_id (UUID | None) – Optional instance ID filter.

Return type:

Sequence[StepExecutionModel]

Returns:

List of failed step executions.

model_type

alias of StepExecutionModel

class litestar_workflows.db.WorkflowDefinitionModel[source]

Bases: UUIDAuditBase

Persisted workflow definition for versioning and storage.

Stores the serialized workflow definition along with metadata for querying and managing workflow versions.

name

Unique name identifier for the workflow.

version

Semantic version string (e.g., “1.0.0”).

description

Human-readable description of the workflow.

definition_json

Serialized WorkflowDefinition as JSON.

is_active

Whether this version is active for new instances.

instances

Related workflow instances.

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

created_at: Mapped[datetime.datetime]

Date/time of instance creation.

definition_json: Mapped[dict[str, Any]]
description: Mapped[str | None]
id: Mapped[UUID]

UUID Primary key column.

instances: Mapped[list[WorkflowInstanceModel]]
is_active: Mapped[bool]
name: Mapped[str]
updated_at: Mapped[datetime.datetime]

Date/time of instance last update.

version: Mapped[str]
class litestar_workflows.db.WorkflowDefinitionRepository[source]

Bases: SQLAlchemyAsyncRepository[WorkflowDefinitionModel]

Repository for workflow definition CRUD operations.

Provides methods for managing workflow definitions including version management and activation status.

Parameters:
async deactivate_version(name, version)[source]

Deactivate a specific workflow version.

Parameters:
  • name (str) – The workflow name.

  • version (str) – The version to deactivate.

Return type:

bool

Returns:

True if a definition was deactivated.

async get_by_name(name, version=None, *, active_only=True)[source]

Get a workflow definition by name and optional version.

Parameters:
  • name (str) – The workflow name.

  • version (str | None) – Optional specific version. If None, returns the latest active version.

  • active_only (bool) – If True, only return active definitions.

Return type:

WorkflowDefinitionModel | None

Returns:

The workflow definition or None if not found.

async get_latest_version(name)[source]

Get the latest active version of a workflow definition.

Parameters:

name (str) – The workflow name.

Return type:

WorkflowDefinitionModel | None

Returns:

The latest active workflow definition or None.

async list_active()[source]

List all active workflow definitions.

Return type:

Sequence[WorkflowDefinitionModel]

Returns:

List of active workflow definitions.

model_type

alias of WorkflowDefinitionModel

class litestar_workflows.db.WorkflowInstanceModel[source]

Bases: UUIDAuditBase

Persisted workflow instance representing a running or completed execution.

Stores the current state of a workflow execution including context data, current step, and execution history.

definition_id

Foreign key to the workflow definition.

workflow_name

Denormalized workflow name for quick queries.

workflow_version

Denormalized workflow version.

status

Current execution status.

current_step

Name of the currently executing step (None if complete).

context_data

Mutable workflow context data as JSON.

metadata

Immutable metadata about the execution.

error

Error message if workflow failed.

started_at

Timestamp when execution began.

completed_at

Timestamp when execution finished.

tenant_id

Optional tenant identifier for multi-tenancy.

created_by

Optional user who started the workflow.

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

completed_at: Mapped[datetime | None]
context_data: Mapped[dict[str, Any]]
created_at: Mapped[datetime.datetime]

Date/time of instance creation.

created_by: Mapped[str | None]
current_step: Mapped[str | None]
definition: Mapped[WorkflowDefinitionModel]
definition_id: Mapped[UUID]
error: Mapped[str | None]
human_tasks: Mapped[list[HumanTaskModel]]
id: Mapped[UUID]

UUID Primary key column.

metadata_: Mapped[dict[str, Any]]
started_at: Mapped[datetime]
status: Mapped[WorkflowStatus]
step_executions: Mapped[list[StepExecutionModel]]
tenant_id: Mapped[str | None]
updated_at: Mapped[datetime.datetime]

Date/time of instance last update.

workflow_name: Mapped[str]
workflow_version: Mapped[str]
class litestar_workflows.db.WorkflowInstanceRepository[source]

Bases: SQLAlchemyAsyncRepository[WorkflowInstanceModel]

Repository for workflow instance CRUD operations.

Provides methods for querying and managing workflow instances including filtering by status, user, and workflow name.

Parameters:
async find_by_tenant(tenant_id, status=None, limit=100, offset=0)[source]

Find instances by tenant ID.

Parameters:
  • tenant_id (str) – The tenant ID to filter by.

  • status (WorkflowStatus | None) – Optional status filter.

  • limit (int) – Maximum number of results.

  • offset (int) – Number of results to skip.

Return type:

tuple[Sequence[WorkflowInstanceModel], int]

Returns:

Tuple of (instances, total_count).

async find_by_user(user_id, status=None)[source]

Find instances created by a specific user.

Parameters:
Return type:

Sequence[WorkflowInstanceModel]

Returns:

List of workflow instances.

async find_by_workflow(workflow_name, status=None, limit=100, offset=0)[source]

Find instances by workflow name with optional status filter.

Parameters:
  • workflow_name (str) – The workflow name to filter by.

  • status (WorkflowStatus | None) – Optional status filter.

  • limit (int) – Maximum number of results.

  • offset (int) – Number of results to skip.

Return type:

tuple[Sequence[WorkflowInstanceModel], int]

Returns:

Tuple of (instances, total_count).

async find_running()[source]

Find all running or waiting workflow instances.

Return type:

Sequence[WorkflowInstanceModel]

Returns:

List of active workflow instances.

model_type

alias of WorkflowInstanceModel

async update_status(instance_id, status, *, current_step=None, error=None)[source]

Update the status of a workflow instance.

Parameters:
  • instance_id (UUID) – The instance ID.

  • status (WorkflowStatus) – The new status.

  • current_step (str | None) – Optional current step name.

  • error (str | None) – Optional error message.

Return type:

WorkflowInstanceModel | None

Returns:

The updated instance or None if not found.

SQLAlchemy models for workflow persistence.

This module defines the database models for persisting workflow state: - WorkflowDefinitionModel: Stores workflow definition metadata and schema - WorkflowInstanceModel: Stores running/completed workflow instances - StepExecutionModel: Records individual step executions - HumanTaskModel: Tracks pending human approval tasks

class litestar_workflows.db.models.HumanTaskModel[source]

Bases: UUIDAuditBase

Pending human task for quick querying and assignment.

Provides a denormalized view of pending human approval tasks for efficient querying by assignee, due date, and status.

instance_id

Foreign key to the workflow instance.

step_execution_id

Foreign key to the step execution.

step_name

Name of the human task step.

title

Display title for the task.

description

Detailed description of the task.

form_schema

JSON Schema defining the task form.

assignee_id

User ID assigned to complete the task.

assignee_group

Group/role that can complete the task.

due_at

Deadline for task completion.

reminder_at

When to send a reminder.

status

Current task status (PENDING, COMPLETED, CANCELED).

completed_at

When the task was completed.

completed_by

User who completed the task.

instance_id: Mapped[UUID]
step_execution_id: Mapped[UUID]
step_name: Mapped[str]
title: Mapped[str]
description: Mapped[str | None]
form_schema: Mapped[dict[str, Any] | None]
assignee_id: Mapped[str | None]
assignee_group: Mapped[str | None]
due_at: Mapped[datetime | None]
reminder_at: Mapped[datetime | None]
status: Mapped[str]
completed_at: Mapped[datetime | None]
completed_by: Mapped[str | None]
instance: Mapped[WorkflowInstanceModel]
__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

created_at: Mapped[datetime.datetime]

Date/time of instance creation.

id: Mapped[UUID]

UUID Primary key column.

updated_at: Mapped[datetime.datetime]

Date/time of instance last update.

class litestar_workflows.db.models.StepExecutionModel[source]

Bases: UUIDAuditBase

Record of a single step execution within a workflow instance.

Tracks the execution of each step including timing, status, and input/output data for debugging and audit purposes.

instance_id

Foreign key to the workflow instance.

step_name

Name of the executed step.

step_type

Type of step (MACHINE, HUMAN, etc.).

status

Execution status of the step.

input_data

Input data passed to the step.

output_data

Output data produced by the step.

error

Error message if step failed.

started_at

Timestamp when step execution began.

completed_at

Timestamp when step execution finished.

assigned_to

User ID assigned to human tasks.

completed_by

User ID who completed human tasks.

instance_id: Mapped[UUID]
step_name: Mapped[str]
step_type: Mapped[StepType]
status: Mapped[StepStatus]
input_data: Mapped[dict[str, Any] | None]
output_data: Mapped[dict[str, Any] | None]
error: Mapped[str | None]
started_at: Mapped[datetime]
completed_at: Mapped[datetime | None]
assigned_to: Mapped[str | None]
completed_by: Mapped[str | None]
instance: Mapped[WorkflowInstanceModel]
__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

created_at: Mapped[datetime.datetime]

Date/time of instance creation.

id: Mapped[UUID]

UUID Primary key column.

updated_at: Mapped[datetime.datetime]

Date/time of instance last update.

class litestar_workflows.db.models.WorkflowDefinitionModel[source]

Bases: UUIDAuditBase

Persisted workflow definition for versioning and storage.

Stores the serialized workflow definition along with metadata for querying and managing workflow versions.

name

Unique name identifier for the workflow.

version

Semantic version string (e.g., “1.0.0”).

description

Human-readable description of the workflow.

definition_json

Serialized WorkflowDefinition as JSON.

is_active

Whether this version is active for new instances.

instances

Related workflow instances.

name: Mapped[str]
version: Mapped[str]
description: Mapped[str | None]
definition_json: Mapped[dict[str, Any]]
is_active: Mapped[bool]
instances: Mapped[list[WorkflowInstanceModel]]
__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

created_at: Mapped[datetime.datetime]

Date/time of instance creation.

id: Mapped[UUID]

UUID Primary key column.

updated_at: Mapped[datetime.datetime]

Date/time of instance last update.

class litestar_workflows.db.models.WorkflowInstanceModel[source]

Bases: UUIDAuditBase

Persisted workflow instance representing a running or completed execution.

Stores the current state of a workflow execution including context data, current step, and execution history.

definition_id

Foreign key to the workflow definition.

workflow_name

Denormalized workflow name for quick queries.

workflow_version

Denormalized workflow version.

status

Current execution status.

current_step

Name of the currently executing step (None if complete).

context_data

Mutable workflow context data as JSON.

metadata

Immutable metadata about the execution.

error

Error message if workflow failed.

started_at

Timestamp when execution began.

completed_at

Timestamp when execution finished.

tenant_id

Optional tenant identifier for multi-tenancy.

created_by

Optional user who started the workflow.

definition_id: Mapped[UUID]
workflow_name: Mapped[str]
workflow_version: Mapped[str]
status: Mapped[WorkflowStatus]
current_step: Mapped[str | None]
context_data: Mapped[dict[str, Any]]
metadata_: Mapped[dict[str, Any]]
error: Mapped[str | None]
started_at: Mapped[datetime]
completed_at: Mapped[datetime | None]
tenant_id: Mapped[str | None]
created_by: Mapped[str | None]
definition: Mapped[WorkflowDefinitionModel]
step_executions: Mapped[list[StepExecutionModel]]
human_tasks: Mapped[list[HumanTaskModel]]
__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

created_at: Mapped[datetime.datetime]

Date/time of instance creation.

id: Mapped[UUID]

UUID Primary key column.

updated_at: Mapped[datetime.datetime]

Date/time of instance last update.

Repository implementations for workflow persistence.

This module provides async repositories for CRUD operations on workflow models using advanced-alchemy’s repository pattern.

class litestar_workflows.db.repositories.HumanTaskRepository[source]

Bases: SQLAlchemyAsyncRepository[HumanTaskModel]

Repository for human task CRUD operations.

Provides methods for querying pending human tasks by assignee, group, and due date.

Parameters:
model_type

alias of HumanTaskModel

async find_pending(assignee_id=None, assignee_group=None)[source]

Find pending human tasks.

Parameters:
  • assignee_id (str | None) – Optional assignee ID filter.

  • assignee_group (str | None) – Optional group filter.

Return type:

Sequence[HumanTaskModel]

Returns:

List of pending human tasks.

async find_by_instance(instance_id)[source]

Find all human tasks for an instance.

Parameters:

instance_id (UUID) – The workflow instance ID.

Return type:

Sequence[HumanTaskModel]

Returns:

List of human tasks.

async find_overdue()[source]

Find overdue pending human tasks.

Return type:

Sequence[HumanTaskModel]

Returns:

List of overdue human tasks.

async complete_task(task_id, completed_by)[source]

Mark a human task as completed.

Parameters:
  • task_id (UUID) – The task ID.

  • completed_by (str) – User ID who completed the task.

Return type:

HumanTaskModel | None

Returns:

The updated task or None if not found.

async cancel_task(task_id)[source]

Cancel a pending human task.

Parameters:

task_id (UUID) – The task ID.

Return type:

HumanTaskModel | None

Returns:

The updated task or None if not found.

class litestar_workflows.db.repositories.StepExecutionRepository[source]

Bases: SQLAlchemyAsyncRepository[StepExecutionModel]

Repository for step execution record CRUD operations.

Parameters:
model_type

alias of StepExecutionModel

async find_by_instance(instance_id)[source]

Find all step executions for an instance.

Parameters:

instance_id (UUID) – The workflow instance ID.

Return type:

Sequence[StepExecutionModel]

Returns:

List of step executions ordered by start time.

async find_by_step_name(instance_id, step_name)[source]

Find the execution record for a specific step.

Parameters:
  • instance_id (UUID) – The workflow instance ID.

  • step_name (str) – The step name.

Return type:

StepExecutionModel | None

Returns:

The step execution or None.

async find_failed(instance_id=None)[source]

Find failed step executions.

Parameters:

instance_id (UUID | None) – Optional instance ID filter.

Return type:

Sequence[StepExecutionModel]

Returns:

List of failed step executions.

class litestar_workflows.db.repositories.WorkflowDefinitionRepository[source]

Bases: SQLAlchemyAsyncRepository[WorkflowDefinitionModel]

Repository for workflow definition CRUD operations.

Provides methods for managing workflow definitions including version management and activation status.

Parameters:
model_type

alias of WorkflowDefinitionModel

async get_by_name(name, version=None, *, active_only=True)[source]

Get a workflow definition by name and optional version.

Parameters:
  • name (str) – The workflow name.

  • version (str | None) – Optional specific version. If None, returns the latest active version.

  • active_only (bool) – If True, only return active definitions.

Return type:

WorkflowDefinitionModel | None

Returns:

The workflow definition or None if not found.

async get_latest_version(name)[source]

Get the latest active version of a workflow definition.

Parameters:

name (str) – The workflow name.

Return type:

WorkflowDefinitionModel | None

Returns:

The latest active workflow definition or None.

async list_active()[source]

List all active workflow definitions.

Return type:

Sequence[WorkflowDefinitionModel]

Returns:

List of active workflow definitions.

async deactivate_version(name, version)[source]

Deactivate a specific workflow version.

Parameters:
  • name (str) – The workflow name.

  • version (str) – The version to deactivate.

Return type:

bool

Returns:

True if a definition was deactivated.

class litestar_workflows.db.repositories.WorkflowInstanceRepository[source]

Bases: SQLAlchemyAsyncRepository[WorkflowInstanceModel]

Repository for workflow instance CRUD operations.

Provides methods for querying and managing workflow instances including filtering by status, user, and workflow name.

Parameters:
model_type

alias of WorkflowInstanceModel

async find_by_workflow(workflow_name, status=None, limit=100, offset=0)[source]

Find instances by workflow name with optional status filter.

Parameters:
  • workflow_name (str) – The workflow name to filter by.

  • status (WorkflowStatus | None) – Optional status filter.

  • limit (int) – Maximum number of results.

  • offset (int) – Number of results to skip.

Return type:

tuple[Sequence[WorkflowInstanceModel], int]

Returns:

Tuple of (instances, total_count).

async find_by_user(user_id, status=None)[source]

Find instances created by a specific user.

Parameters:
Return type:

Sequence[WorkflowInstanceModel]

Returns:

List of workflow instances.

async find_by_tenant(tenant_id, status=None, limit=100, offset=0)[source]

Find instances by tenant ID.

Parameters:
  • tenant_id (str) – The tenant ID to filter by.

  • status (WorkflowStatus | None) – Optional status filter.

  • limit (int) – Maximum number of results.

  • offset (int) – Number of results to skip.

Return type:

tuple[Sequence[WorkflowInstanceModel], int]

Returns:

Tuple of (instances, total_count).

async find_running()[source]

Find all running or waiting workflow instances.

Return type:

Sequence[WorkflowInstanceModel]

Returns:

List of active workflow instances.

async update_status(instance_id, status, *, current_step=None, error=None)[source]

Update the status of a workflow instance.

Parameters:
  • instance_id (UUID) – The instance ID.

  • status (WorkflowStatus) – The new status.

  • current_step (str | None) – Optional current step name.

  • error (str | None) – Optional error message.

Return type:

WorkflowInstanceModel | None

Returns:

The updated instance or None if not found.

Persistent execution engine with database storage.

This module provides a persistence-aware execution engine that stores workflow state in a database using SQLAlchemy.

class litestar_workflows.db.engine.PersistentExecutionEngine[source]

Bases: object

Execution engine with database persistence.

This engine persists all workflow state to a database, enabling durability, recovery, and querying of workflow history.

registry

The workflow registry for looking up definitions.

session

SQLAlchemy async session for database operations.

event_bus

Optional event bus for emitting workflow events.

Parameters:
__init__(registry, session, event_bus=None)[source]

Initialize the persistent execution engine.

Parameters:
async start_workflow(workflow, initial_data=None, *, tenant_id=None, created_by=None)[source]

Start a new workflow instance with persistence.

Parameters:
  • workflow (type[Workflow]) – The workflow class to execute.

  • initial_data (dict[str, Any] | None) – Optional initial data for the workflow.

  • tenant_id (str | None) – Optional tenant ID for multi-tenancy.

  • created_by (str | None) – Optional user ID who started the workflow.

Return type:

WorkflowInstanceData

Returns:

The created WorkflowInstanceData.

async complete_human_task(instance_id, step_name, user_id, data)[source]

Complete a human task with user-provided data.

Parameters:
  • instance_id (UUID) – The workflow instance ID.

  • step_name (str) – Name of the human task step.

  • user_id (str) – ID of the user completing the task.

  • data (dict[str, Any]) – User-provided data to merge into context.

Return type:

None

async cancel_workflow(instance_id, reason)[source]

Cancel a running workflow.

Parameters:
  • instance_id (UUID) – The workflow instance ID.

  • reason (str) – Reason for cancellation.

Return type:

None

async get_instance(instance_id)[source]

Get a workflow instance by ID.

Parameters:

instance_id (UUID) – The workflow instance ID.

Return type:

WorkflowInstanceData

Returns:

The WorkflowInstanceData.

Raises:

KeyError – If instance not found.

get_running_instances()[source]

Get IDs of currently running instances.

Return type:

list[UUID]

Returns:

List of running instance IDs.

Database API

from litestar_workflows.db import (
    WorkflowDefinitionModel,
    WorkflowInstanceModel,
    StepExecutionModel,
    HumanTaskModel,
    WorkflowInstanceRepository,
)

Web Module

The web module provides REST API controllers and DTOs. The REST API is built into the main WorkflowPlugin and enabled by default.

Web plugin for litestar-workflows.

This module provides REST API controllers and HTML UI views for managing workflows through HTTP endpoints. The REST API is automatically enabled when using WorkflowPlugin with enable_api=True (the default).

The API includes controllers for workflow definitions, instances, and human tasks, along with graph visualization utilities.

The UI views (optional, requires [ui] extra) provide: - Dashboard with workflow statistics - Workflow definition list and detail views - Instance list and detail views with MermaidJS graphs - Human task forms with JSON Schema rendering

Example

Basic usage with WorkflowPlugin (API enabled by default):

from litestar import Litestar
from litestar_workflows import WorkflowPlugin, WorkflowPluginConfig

app = Litestar(
    plugins=[
        WorkflowPlugin(
            config=WorkflowPluginConfig(
                enable_api=True,  # Default
                api_path_prefix="/workflows",
            )
        ),
    ],
)

With UI enabled (requires [ui] extra):

from litestar import Litestar
from litestar_workflows import WorkflowPlugin, WorkflowPluginConfig
from litestar_workflows.web.views import WorkflowUIController, get_template_config

app = Litestar(
    plugins=[WorkflowPlugin(config=WorkflowPluginConfig())],
    route_handlers=[WorkflowUIController],
    template_config=get_template_config(),
)

With authentication guards:

from litestar_workflows import WorkflowPluginConfig

config = WorkflowPluginConfig(
    api_path_prefix="/api/v1/workflows",
    api_guards=[require_auth_guard],
)

app = Litestar(
    plugins=[WorkflowPlugin(config=config)],
)

Disable API endpoints:

app = Litestar(
    plugins=[
        WorkflowPlugin(config=WorkflowPluginConfig(enable_api=False)),
    ],
)
class litestar_workflows.web.CompleteTaskDTO[source]

Bases: object

DTO for completing a human task.

output_data

Data submitted by the user completing the task.

completed_by

User ID who completed the task.

comment

Optional comment about the completion.

Parameters:
__init__(output_data, completed_by, comment=None)
Parameters:
comment: str | None = None
output_data: dict[str, Any]
completed_by: str
exception litestar_workflows.web.DatabaseRequiredError[source]

Bases: Exception

Raised when an endpoint requires database persistence but it’s not available.

This exception is raised when a user tries to access an endpoint that requires the [db] extra but it hasn’t been installed.

Parameters:

message (str | None)

__init__(message=None)[source]

Initialize the exception.

Parameters:

message (str | None) – Optional custom error message.

class litestar_workflows.web.GraphDTO[source]

Bases: object

DTO for workflow graph visualization.

mermaid_source

MermaidJS graph definition.

nodes

List of node definitions.

edges

List of edge definitions.

Parameters:
__init__(mermaid_source, nodes, edges)
Parameters:
mermaid_source: str
nodes: list[dict[str, Any]]
edges: list[dict[str, Any]]
class litestar_workflows.web.HumanTaskController[source]

Bases: Controller

API controller for human tasks.

Provides endpoints for managing human approval tasks including listing, completing, and reassigning tasks.

Tags: Human Tasks

Parameters:

owner (Router)

after_request: AfterRequestHookHandler | None

A sync or async function executed before a Request is passed to any route handler.

If this function returns a value, the request will not reach the route handler, and instead this value will be used.

after_response: AfterResponseHookHandler | None

A sync or async function called after the response has been awaited.

It receives the Request instance and should not return any values.

before_request: BeforeRequestHookHandler | None

A sync or async function called immediately before calling the route handler.

It receives the Request instance and any non-None return value is used for the response, bypassing the route handler.

cache_control: CacheControlHeader | None

A CacheControlHeader header to add to route handlers of this controller.

Can be overridden by route handlers.

dependencies: Dependencies | None

A string keyed dictionary of dependency Provider instances.

dto: type[AbstractDTO] | None | EmptyType

AbstractDTO to use for (de)serializing and validation of request data.

etag: ETag | None

An etag header of type ETag to add to route handlers of this controller.

Can be overridden by route handlers.

exception_handlers: ExceptionHandlersMap | None

A map of handler functions to status codes and/or exception types.

guards: Sequence[Guard] | None

A sequence of Guard callables.

include_in_schema: bool | EmptyType

A boolean flag dictating whether the route handler should be documented in the OpenAPI schema

middleware: Sequence[Middleware] | None

A sequence of Middleware.

opt: Mapping[str, Any] | None

A string key mapping of arbitrary values that can be accessed in Guards or wherever you have access to Request or ASGI Scope.

owner: Router

The Router or Litestar app that owns the controller.

This value is set internally by Litestar and it should not be set when subclassing the controller.

parameters: ParametersMap | None

A mapping of Parameter definitions available to all application paths.

path: str

A path fragment for the controller.

All route handlers under the controller will have the fragment appended to them. If not set it defaults to /.

request_class: type[Request] | None

A custom subclass of Request to be used as the default request for all route handlers under the controller.

request_max_body_size: int | None | EmptyType

Maximum allowed size of the request body in bytes. If this size is exceeded, a ‘413 - Request Entity Too Large’ error response is returned.

response_class: type[Response] | None

A custom subclass of Response to be used as the default response for all route handlers under the controller.

response_cookies: ResponseCookies | None

A list of Cookie instances.

response_headers: ResponseHeaders | None

A string keyed dictionary mapping ResponseHeader instances.

return_dto: type[AbstractDTO] | None | EmptyType

AbstractDTO to use for serializing outbound response data.

security: Sequence[SecurityRequirement] | None

A sequence of dictionaries that to the schema of all route handlers under the controller.

signature_namespace: dict[str, Any]

A mapping of names to types for use in forward reference resolution during signature modelling.

signature_types: Sequence[Any]

A sequence of types for use in forward reference resolution during signature modelling.

These types will be added to the signature namespace using their __name__ attribute.

tags: ClassVar[list[str]]

A sequence of string tags that will be appended to the schema of all route handlers under the controller.

type_decoders: TypeDecodersSequence | None

A sequence of tuples, each composed of a predicate testing for type identity and a msgspec hook for deserialization.

type_encoders: TypeEncodersMap | None

A mapping of types to callables that transform them into types supported for serialization.

websocket_class: type[WebSocket] | None

A custom subclass of WebSocket to be used as the default websocket for all route handlers under the controller.

complete_task(fn) = <litestar.handlers.http_handlers.decorators.post object>
get_task(fn) = <litestar.handlers.http_handlers.decorators.get object>
list_tasks(fn) = <litestar.handlers.http_handlers.decorators.get object>
reassign_task(fn) = <litestar.handlers.http_handlers.decorators.post object>
class litestar_workflows.web.HumanTaskDTO[source]

Bases: object

DTO for human task summary.

id

Task ID.

instance_id

Workflow instance ID.

step_name

Name of the human task step.

title

Display title for the task.

description

Detailed task description.

assignee

User ID assigned to complete the task.

status

Task status (pending, completed, canceled).

due_date

Optional due date for task completion.

created_at

When the task was created.

form_schema

Optional JSON Schema for task form.

Parameters:
__init__(id, instance_id, step_name, title, description, assignee, status, due_date, created_at, form_schema=None)
Parameters:
form_schema: dict[str, Any] | None = None
id: UUID
instance_id: UUID
step_name: str
title: str
description: str | None
assignee: str | None
status: str
due_date: datetime | None
created_at: datetime
class litestar_workflows.web.ReassignTaskDTO[source]

Bases: object

DTO for reassigning a human task.

new_assignee

User ID to assign the task to.

reason

Optional reason for reassignment.

Parameters:
__init__(new_assignee, reason=None)
Parameters:
reason: str | None = None
new_assignee: str
class litestar_workflows.web.StartWorkflowDTO[source]

Bases: object

DTO for starting a new workflow instance.

definition_name

Name of the workflow definition to instantiate.

input_data

Initial data to pass to the workflow context.

correlation_id

Optional correlation ID for tracking related workflows.

user_id

Optional user ID who started the workflow.

tenant_id

Optional tenant ID for multi-tenancy.

Parameters:
__init__(definition_name, input_data=None, correlation_id=None, user_id=None, tenant_id=None)
Parameters:
correlation_id: str | None = None
input_data: dict[str, Any] | None = None
tenant_id: str | None = None
user_id: str | None = None
definition_name: str
class litestar_workflows.web.StepExecutionDTO[source]

Bases: object

DTO for step execution record.

id

Step execution ID.

step_name

Name of the executed step.

status

Execution status (PENDING, RUNNING, SUCCEEDED, etc.).

started_at

When execution started.

completed_at

When execution completed (if finished).

error

Error message if execution failed.

Parameters:
__init__(id, step_name, status, started_at, completed_at=None, error=None)
Parameters:
completed_at: datetime | None = None
error: str | None = None
id: UUID
step_name: str
status: str
started_at: datetime
class litestar_workflows.web.WorkflowDefinitionController[source]

Bases: Controller

API controller for workflow definitions.

Provides endpoints for listing and retrieving workflow definitions, including their schemas and graph visualizations.

Tags: Workflow Definitions

Parameters:

owner (Router)

after_request: AfterRequestHookHandler | None

A sync or async function executed before a Request is passed to any route handler.

If this function returns a value, the request will not reach the route handler, and instead this value will be used.

after_response: AfterResponseHookHandler | None

A sync or async function called after the response has been awaited.

It receives the Request instance and should not return any values.

before_request: BeforeRequestHookHandler | None

A sync or async function called immediately before calling the route handler.

It receives the Request instance and any non-None return value is used for the response, bypassing the route handler.

cache_control: CacheControlHeader | None

A CacheControlHeader header to add to route handlers of this controller.

Can be overridden by route handlers.

dependencies: Dependencies | None

A string keyed dictionary of dependency Provider instances.

dto: type[AbstractDTO] | None | EmptyType

AbstractDTO to use for (de)serializing and validation of request data.

etag: ETag | None

An etag header of type ETag to add to route handlers of this controller.

Can be overridden by route handlers.

exception_handlers: ExceptionHandlersMap | None

A map of handler functions to status codes and/or exception types.

guards: Sequence[Guard] | None

A sequence of Guard callables.

include_in_schema: bool | EmptyType

A boolean flag dictating whether the route handler should be documented in the OpenAPI schema

middleware: Sequence[Middleware] | None

A sequence of Middleware.

opt: Mapping[str, Any] | None

A string key mapping of arbitrary values that can be accessed in Guards or wherever you have access to Request or ASGI Scope.

owner: Router

The Router or Litestar app that owns the controller.

This value is set internally by Litestar and it should not be set when subclassing the controller.

parameters: ParametersMap | None

A mapping of Parameter definitions available to all application paths.

path: str

A path fragment for the controller.

All route handlers under the controller will have the fragment appended to them. If not set it defaults to /.

request_class: type[Request] | None

A custom subclass of Request to be used as the default request for all route handlers under the controller.

request_max_body_size: int | None | EmptyType

Maximum allowed size of the request body in bytes. If this size is exceeded, a ‘413 - Request Entity Too Large’ error response is returned.

response_class: type[Response] | None

A custom subclass of Response to be used as the default response for all route handlers under the controller.

response_cookies: ResponseCookies | None

A list of Cookie instances.

response_headers: ResponseHeaders | None

A string keyed dictionary mapping ResponseHeader instances.

return_dto: type[AbstractDTO] | None | EmptyType

AbstractDTO to use for serializing outbound response data.

security: Sequence[SecurityRequirement] | None

A sequence of dictionaries that to the schema of all route handlers under the controller.

signature_namespace: dict[str, Any]

A mapping of names to types for use in forward reference resolution during signature modelling.

signature_types: Sequence[Any]

A sequence of types for use in forward reference resolution during signature modelling.

These types will be added to the signature namespace using their __name__ attribute.

tags: ClassVar[list[str]]

A sequence of string tags that will be appended to the schema of all route handlers under the controller.

type_decoders: TypeDecodersSequence | None

A sequence of tuples, each composed of a predicate testing for type identity and a msgspec hook for deserialization.

type_encoders: TypeEncodersMap | None

A mapping of types to callables that transform them into types supported for serialization.

websocket_class: type[WebSocket] | None

A custom subclass of WebSocket to be used as the default websocket for all route handlers under the controller.

get_definition(fn) = <litestar.handlers.http_handlers.decorators.get object>
get_definition_graph(fn) = <litestar.handlers.http_handlers.decorators.get object>
list_definitions(fn) = <litestar.handlers.http_handlers.decorators.get object>
class litestar_workflows.web.WorkflowDefinitionDTO[source]

Bases: object

DTO for workflow definition metadata.

name

Workflow name.

version

Workflow version.

description

Human-readable description.

steps

List of step names in the workflow.

edges

List of edge definitions (source, target, condition).

initial_step

Name of the starting step.

terminal_steps

List of terminal step names.

Parameters:
__init__(name, version, description, steps, edges, initial_step, terminal_steps)
Parameters:
name: str
version: str
description: str
steps: list[str]
edges: list[dict[str, Any]]
initial_step: str
terminal_steps: list[str]
class litestar_workflows.web.WorkflowInstanceController[source]

Bases: Controller

API controller for workflow instances.

Provides endpoints for starting, listing, and managing workflow instance executions.

Tags: Workflow Instances

Parameters:

owner (Router)

after_request: AfterRequestHookHandler | None

A sync or async function executed before a Request is passed to any route handler.

If this function returns a value, the request will not reach the route handler, and instead this value will be used.

after_response: AfterResponseHookHandler | None

A sync or async function called after the response has been awaited.

It receives the Request instance and should not return any values.

before_request: BeforeRequestHookHandler | None

A sync or async function called immediately before calling the route handler.

It receives the Request instance and any non-None return value is used for the response, bypassing the route handler.

cache_control: CacheControlHeader | None

A CacheControlHeader header to add to route handlers of this controller.

Can be overridden by route handlers.

dependencies: Dependencies | None

A string keyed dictionary of dependency Provider instances.

dto: type[AbstractDTO] | None | EmptyType

AbstractDTO to use for (de)serializing and validation of request data.

etag: ETag | None

An etag header of type ETag to add to route handlers of this controller.

Can be overridden by route handlers.

exception_handlers: ExceptionHandlersMap | None

A map of handler functions to status codes and/or exception types.

guards: Sequence[Guard] | None

A sequence of Guard callables.

include_in_schema: bool | EmptyType

A boolean flag dictating whether the route handler should be documented in the OpenAPI schema

middleware: Sequence[Middleware] | None

A sequence of Middleware.

opt: Mapping[str, Any] | None

A string key mapping of arbitrary values that can be accessed in Guards or wherever you have access to Request or ASGI Scope.

owner: Router

The Router or Litestar app that owns the controller.

This value is set internally by Litestar and it should not be set when subclassing the controller.

parameters: ParametersMap | None

A mapping of Parameter definitions available to all application paths.

path: str

A path fragment for the controller.

All route handlers under the controller will have the fragment appended to them. If not set it defaults to /.

request_class: type[Request] | None

A custom subclass of Request to be used as the default request for all route handlers under the controller.

request_max_body_size: int | None | EmptyType

Maximum allowed size of the request body in bytes. If this size is exceeded, a ‘413 - Request Entity Too Large’ error response is returned.

response_class: type[Response] | None

A custom subclass of Response to be used as the default response for all route handlers under the controller.

response_cookies: ResponseCookies | None

A list of Cookie instances.

response_headers: ResponseHeaders | None

A string keyed dictionary mapping ResponseHeader instances.

return_dto: type[AbstractDTO] | None | EmptyType

AbstractDTO to use for serializing outbound response data.

security: Sequence[SecurityRequirement] | None

A sequence of dictionaries that to the schema of all route handlers under the controller.

signature_namespace: dict[str, Any]

A mapping of names to types for use in forward reference resolution during signature modelling.

signature_types: Sequence[Any]

A sequence of types for use in forward reference resolution during signature modelling.

These types will be added to the signature namespace using their __name__ attribute.

tags: ClassVar[list[str]]

A sequence of string tags that will be appended to the schema of all route handlers under the controller.

type_decoders: TypeDecodersSequence | None

A sequence of tuples, each composed of a predicate testing for type identity and a msgspec hook for deserialization.

type_encoders: TypeEncodersMap | None

A mapping of types to callables that transform them into types supported for serialization.

websocket_class: type[WebSocket] | None

A custom subclass of WebSocket to be used as the default websocket for all route handlers under the controller.

cancel_instance(fn) = <litestar.handlers.http_handlers.decorators.post object>
get_instance(fn) = <litestar.handlers.http_handlers.decorators.get object>
get_instance_graph(fn) = <litestar.handlers.http_handlers.decorators.get object>
list_instances(fn) = <litestar.handlers.http_handlers.decorators.get object>
retry_instance(fn) = <litestar.handlers.http_handlers.decorators.post object>
start_workflow(fn) = <litestar.handlers.http_handlers.decorators.post object>
class litestar_workflows.web.WorkflowInstanceDTO[source]

Bases: object

DTO for workflow instance summary.

id

Instance ID.

definition_name

Name of the workflow definition.

status

Current execution status.

current_step

Currently executing step (if applicable).

started_at

When the workflow started.

completed_at

When the workflow completed (if finished).

created_by

User who started the workflow.

Parameters:
__init__(id, definition_name, status, current_step, started_at, completed_at=None, created_by=None)
Parameters:
completed_at: datetime | None = None
created_by: str | None = None
id: UUID
definition_name: str
status: str
current_step: str | None
started_at: datetime
class litestar_workflows.web.WorkflowInstanceDetailDTO[source]

Bases: object

DTO for detailed workflow instance information.

Extends WorkflowInstanceDTO with full context and execution history.

id

Instance ID.

definition_name

Name of the workflow definition.

status

Current execution status.

current_step

Currently executing step (if applicable).

started_at

When the workflow started.

completed_at

When the workflow completed (if finished).

created_by

User who started the workflow.

context_data

Current workflow context data.

metadata

Workflow metadata.

step_history

List of step executions.

error

Error message if workflow failed.

Parameters:
__init__(id, definition_name, status, current_step, started_at, completed_at, created_by, context_data, metadata, step_history, error=None)
Parameters:
error: str | None = None
id: UUID
definition_name: str
status: str
current_step: str | None
started_at: datetime
completed_at: datetime | None
created_by: str | None
context_data: dict[str, Any]
metadata: dict[str, Any]
step_history: list[StepExecutionDTO]
class litestar_workflows.web.WorkflowWebConfig[source]

Bases: object

Configuration for the workflow web plugin.

This dataclass defines all configurable options for the WorkflowWebPlugin, allowing users to customize the REST API endpoints, authentication, and feature availability.

path_prefix

URL path prefix for all workflow endpoints.

include_in_schema

Whether to include endpoints in OpenAPI schema.

guards

List of Litestar guards to apply to all workflow endpoints.

enable_graph_endpoints

Whether to enable graph visualization endpoints.

tags

OpenAPI tags to apply to workflow endpoints.

Example

>>> from litestar_workflows.web import WorkflowWebConfig
>>> config = WorkflowWebConfig(
...     path_prefix="/api/v1/workflows",
...     guards=[require_auth],
...     enable_graph_endpoints=True,
... )
Parameters:
  • path_prefix (str)

  • include_in_schema (bool)

  • guards (list[Guard])

  • enable_graph_endpoints (bool)

  • tags (list[str])

__init__(path_prefix='/workflows', include_in_schema=True, guards=<factory>, enable_graph_endpoints=True, tags=<factory>)
Parameters:
  • path_prefix (str)

  • include_in_schema (bool)

  • guards (list[Guard])

  • enable_graph_endpoints (bool)

  • tags (list[str])

enable_graph_endpoints: bool = True
include_in_schema: bool = True
path_prefix: str = '/workflows'
guards: list[Guard]
tags: list[str]
litestar_workflows.web.database_required_handler(_request, exc)[source]

Exception handler for DatabaseRequiredError.

Returns a 501 Not Implemented response with installation instructions.

Parameters:
Return type:

Response

Returns:

Response with error details and installation instructions.

litestar_workflows.web.generate_mermaid_graph(definition)[source]

Generate a MermaidJS graph representation of a workflow definition.

This function creates a Mermaid flowchart diagram that visualizes the workflow’s steps and transitions. Different step types are represented with different node shapes.

Parameters:

definition (WorkflowDefinition) – The workflow definition to visualize.

Return type:

str

Returns:

A MermaidJS flowchart definition as a string.

Example

>>> mermaid = generate_mermaid_graph(definition)
>>> print(mermaid)
graph TD
    submit[Submit]
    review{{Review}}
    approve[Approve]
    submit --> review
    review --> approve
litestar_workflows.web.generate_mermaid_graph_with_state(definition, current_step=None, completed_steps=None, failed_steps=None)[source]

Generate a MermaidJS graph with execution state highlighting.

This function creates a Mermaid flowchart that includes visual styling to highlight the current step, completed steps, and failed steps.

Parameters:
  • definition (WorkflowDefinition) – The workflow definition to visualize.

  • current_step (str | None) – Name of the currently executing step.

  • completed_steps (list[str] | None) – List of successfully completed step names.

  • failed_steps (list[str] | None) – List of failed step names.

Return type:

str

Returns:

A MermaidJS flowchart definition with state styling.

Example

>>> mermaid = generate_mermaid_graph_with_state(
...     definition,
...     current_step="review",
...     completed_steps=["submit"],
...     failed_steps=[],
... )
litestar_workflows.web.parse_graph_to_dict(definition)[source]

Parse a workflow definition into a dictionary representation.

This function extracts nodes and edges from a workflow definition into a structured dictionary format suitable for JSON serialization.

Parameters:

definition (WorkflowDefinition) – The workflow definition to parse.

Return type:

dict[str, Any]

Returns:

A dictionary containing nodes and edges lists.

Example

>>> graph_dict = parse_graph_to_dict(definition)
>>> print(graph_dict["nodes"])
[{"id": "submit", "label": "Submit", "type": "machine"}, ...]
litestar_workflows.web.require_db()[source]

Dependency that checks if database persistence is available.

This function is used as a dependency in route handlers that require database functionality. It will raise DatabaseRequiredError if the required database components are not available.

Raises:

DatabaseRequiredError – If database components are not installed.

Example

```python from litestar import get from litestar.di import Provide

@get(

“/instances”, dependencies={“_db_check”: Provide(require_db)},

) async def list_instances() -> list[dict]:

# This endpoint requires database to be installed …

```

Return type:

None

REST API controllers for workflow management.

This module provides three controller classes for managing workflows: - WorkflowDefinitionController: Manage workflow definitions and schemas - WorkflowInstanceController: Start, monitor, and control workflow executions - HumanTaskController: Manage human approval tasks

class litestar_workflows.web.controllers.HumanTaskController[source]

Bases: Controller

API controller for human tasks.

Provides endpoints for managing human approval tasks including listing, completing, and reassigning tasks.

Tags: Human Tasks

Parameters:

owner (Router)

path: str

A path fragment for the controller.

All route handlers under the controller will have the fragment appended to them. If not set it defaults to /.

tags: ClassVar[list[str]]

A sequence of string tags that will be appended to the schema of all route handlers under the controller.

list_tasks(fn) = <litestar.handlers.http_handlers.decorators.get object>
get_task(fn) = <litestar.handlers.http_handlers.decorators.get object>
complete_task(fn) = <litestar.handlers.http_handlers.decorators.post object>
reassign_task(fn) = <litestar.handlers.http_handlers.decorators.post object>
after_request: AfterRequestHookHandler | None

A sync or async function executed before a Request is passed to any route handler.

If this function returns a value, the request will not reach the route handler, and instead this value will be used.

after_response: AfterResponseHookHandler | None

A sync or async function called after the response has been awaited.

It receives the Request instance and should not return any values.

before_request: BeforeRequestHookHandler | None

A sync or async function called immediately before calling the route handler.

It receives the Request instance and any non-None return value is used for the response, bypassing the route handler.

cache_control: CacheControlHeader | None

A CacheControlHeader header to add to route handlers of this controller.

Can be overridden by route handlers.

dependencies: Dependencies | None

A string keyed dictionary of dependency Provider instances.

dto: type[AbstractDTO] | None | EmptyType

AbstractDTO to use for (de)serializing and validation of request data.

etag: ETag | None

An etag header of type ETag to add to route handlers of this controller.

Can be overridden by route handlers.

exception_handlers: ExceptionHandlersMap | None

A map of handler functions to status codes and/or exception types.

guards: Sequence[Guard] | None

A sequence of Guard callables.

include_in_schema: bool | EmptyType

A boolean flag dictating whether the route handler should be documented in the OpenAPI schema

middleware: Sequence[Middleware] | None

A sequence of Middleware.

opt: Mapping[str, Any] | None

A string key mapping of arbitrary values that can be accessed in Guards or wherever you have access to Request or ASGI Scope.

owner: Router

The Router or Litestar app that owns the controller.

This value is set internally by Litestar and it should not be set when subclassing the controller.

parameters: ParametersMap | None

A mapping of Parameter definitions available to all application paths.

request_class: type[Request] | None

A custom subclass of Request to be used as the default request for all route handlers under the controller.

request_max_body_size: int | None | EmptyType

Maximum allowed size of the request body in bytes. If this size is exceeded, a ‘413 - Request Entity Too Large’ error response is returned.

response_class: type[Response] | None

A custom subclass of Response to be used as the default response for all route handlers under the controller.

response_cookies: ResponseCookies | None

A list of Cookie instances.

response_headers: ResponseHeaders | None

A string keyed dictionary mapping ResponseHeader instances.

return_dto: type[AbstractDTO] | None | EmptyType

AbstractDTO to use for serializing outbound response data.

security: Sequence[SecurityRequirement] | None

A sequence of dictionaries that to the schema of all route handlers under the controller.

signature_namespace: dict[str, Any]

A mapping of names to types for use in forward reference resolution during signature modelling.

signature_types: Sequence[Any]

A sequence of types for use in forward reference resolution during signature modelling.

These types will be added to the signature namespace using their __name__ attribute.

type_decoders: TypeDecodersSequence | None

A sequence of tuples, each composed of a predicate testing for type identity and a msgspec hook for deserialization.

type_encoders: TypeEncodersMap | None

A mapping of types to callables that transform them into types supported for serialization.

websocket_class: type[WebSocket] | None

A custom subclass of WebSocket to be used as the default websocket for all route handlers under the controller.

class litestar_workflows.web.controllers.WorkflowDefinitionController[source]

Bases: Controller

API controller for workflow definitions.

Provides endpoints for listing and retrieving workflow definitions, including their schemas and graph visualizations.

Tags: Workflow Definitions

Parameters:

owner (Router)

path: str

A path fragment for the controller.

All route handlers under the controller will have the fragment appended to them. If not set it defaults to /.

tags: ClassVar[list[str]]

A sequence of string tags that will be appended to the schema of all route handlers under the controller.

list_definitions(fn) = <litestar.handlers.http_handlers.decorators.get object>
get_definition(fn) = <litestar.handlers.http_handlers.decorators.get object>
get_definition_graph(fn) = <litestar.handlers.http_handlers.decorators.get object>
after_request: AfterRequestHookHandler | None

A sync or async function executed before a Request is passed to any route handler.

If this function returns a value, the request will not reach the route handler, and instead this value will be used.

after_response: AfterResponseHookHandler | None

A sync or async function called after the response has been awaited.

It receives the Request instance and should not return any values.

before_request: BeforeRequestHookHandler | None

A sync or async function called immediately before calling the route handler.

It receives the Request instance and any non-None return value is used for the response, bypassing the route handler.

cache_control: CacheControlHeader | None

A CacheControlHeader header to add to route handlers of this controller.

Can be overridden by route handlers.

dependencies: Dependencies | None

A string keyed dictionary of dependency Provider instances.

dto: type[AbstractDTO] | None | EmptyType

AbstractDTO to use for (de)serializing and validation of request data.

etag: ETag | None

An etag header of type ETag to add to route handlers of this controller.

Can be overridden by route handlers.

exception_handlers: ExceptionHandlersMap | None

A map of handler functions to status codes and/or exception types.

guards: Sequence[Guard] | None

A sequence of Guard callables.

include_in_schema: bool | EmptyType

A boolean flag dictating whether the route handler should be documented in the OpenAPI schema

middleware: Sequence[Middleware] | None

A sequence of Middleware.

opt: Mapping[str, Any] | None

A string key mapping of arbitrary values that can be accessed in Guards or wherever you have access to Request or ASGI Scope.

owner: Router

The Router or Litestar app that owns the controller.

This value is set internally by Litestar and it should not be set when subclassing the controller.

parameters: ParametersMap | None

A mapping of Parameter definitions available to all application paths.

request_class: type[Request] | None

A custom subclass of Request to be used as the default request for all route handlers under the controller.

request_max_body_size: int | None | EmptyType

Maximum allowed size of the request body in bytes. If this size is exceeded, a ‘413 - Request Entity Too Large’ error response is returned.

response_class: type[Response] | None

A custom subclass of Response to be used as the default response for all route handlers under the controller.

response_cookies: ResponseCookies | None

A list of Cookie instances.

response_headers: ResponseHeaders | None

A string keyed dictionary mapping ResponseHeader instances.

return_dto: type[AbstractDTO] | None | EmptyType

AbstractDTO to use for serializing outbound response data.

security: Sequence[SecurityRequirement] | None

A sequence of dictionaries that to the schema of all route handlers under the controller.

signature_namespace: dict[str, Any]

A mapping of names to types for use in forward reference resolution during signature modelling.

signature_types: Sequence[Any]

A sequence of types for use in forward reference resolution during signature modelling.

These types will be added to the signature namespace using their __name__ attribute.

type_decoders: TypeDecodersSequence | None

A sequence of tuples, each composed of a predicate testing for type identity and a msgspec hook for deserialization.

type_encoders: TypeEncodersMap | None

A mapping of types to callables that transform them into types supported for serialization.

websocket_class: type[WebSocket] | None

A custom subclass of WebSocket to be used as the default websocket for all route handlers under the controller.

class litestar_workflows.web.controllers.WorkflowInstanceController[source]

Bases: Controller

API controller for workflow instances.

Provides endpoints for starting, listing, and managing workflow instance executions.

Tags: Workflow Instances

Parameters:

owner (Router)

path: str

A path fragment for the controller.

All route handlers under the controller will have the fragment appended to them. If not set it defaults to /.

tags: ClassVar[list[str]]

A sequence of string tags that will be appended to the schema of all route handlers under the controller.

start_workflow(fn) = <litestar.handlers.http_handlers.decorators.post object>
list_instances(fn) = <litestar.handlers.http_handlers.decorators.get object>
get_instance(fn) = <litestar.handlers.http_handlers.decorators.get object>
get_instance_graph(fn) = <litestar.handlers.http_handlers.decorators.get object>
cancel_instance(fn) = <litestar.handlers.http_handlers.decorators.post object>
retry_instance(fn) = <litestar.handlers.http_handlers.decorators.post object>
after_request: AfterRequestHookHandler | None

A sync or async function executed before a Request is passed to any route handler.

If this function returns a value, the request will not reach the route handler, and instead this value will be used.

after_response: AfterResponseHookHandler | None

A sync or async function called after the response has been awaited.

It receives the Request instance and should not return any values.

before_request: BeforeRequestHookHandler | None

A sync or async function called immediately before calling the route handler.

It receives the Request instance and any non-None return value is used for the response, bypassing the route handler.

cache_control: CacheControlHeader | None

A CacheControlHeader header to add to route handlers of this controller.

Can be overridden by route handlers.

dependencies: Dependencies | None

A string keyed dictionary of dependency Provider instances.

dto: type[AbstractDTO] | None | EmptyType

AbstractDTO to use for (de)serializing and validation of request data.

etag: ETag | None

An etag header of type ETag to add to route handlers of this controller.

Can be overridden by route handlers.

exception_handlers: ExceptionHandlersMap | None

A map of handler functions to status codes and/or exception types.

guards: Sequence[Guard] | None

A sequence of Guard callables.

include_in_schema: bool | EmptyType

A boolean flag dictating whether the route handler should be documented in the OpenAPI schema

middleware: Sequence[Middleware] | None

A sequence of Middleware.

opt: Mapping[str, Any] | None

A string key mapping of arbitrary values that can be accessed in Guards or wherever you have access to Request or ASGI Scope.

owner: Router

The Router or Litestar app that owns the controller.

This value is set internally by Litestar and it should not be set when subclassing the controller.

parameters: ParametersMap | None

A mapping of Parameter definitions available to all application paths.

request_class: type[Request] | None

A custom subclass of Request to be used as the default request for all route handlers under the controller.

request_max_body_size: int | None | EmptyType

Maximum allowed size of the request body in bytes. If this size is exceeded, a ‘413 - Request Entity Too Large’ error response is returned.

response_class: type[Response] | None

A custom subclass of Response to be used as the default response for all route handlers under the controller.

response_cookies: ResponseCookies | None

A list of Cookie instances.

response_headers: ResponseHeaders | None

A string keyed dictionary mapping ResponseHeader instances.

return_dto: type[AbstractDTO] | None | EmptyType

AbstractDTO to use for serializing outbound response data.

security: Sequence[SecurityRequirement] | None

A sequence of dictionaries that to the schema of all route handlers under the controller.

signature_namespace: dict[str, Any]

A mapping of names to types for use in forward reference resolution during signature modelling.

signature_types: Sequence[Any]

A sequence of types for use in forward reference resolution during signature modelling.

These types will be added to the signature namespace using their __name__ attribute.

type_decoders: TypeDecodersSequence | None

A sequence of tuples, each composed of a predicate testing for type identity and a msgspec hook for deserialization.

type_encoders: TypeEncodersMap | None

A mapping of types to callables that transform them into types supported for serialization.

websocket_class: type[WebSocket] | None

A custom subclass of WebSocket to be used as the default websocket for all route handlers under the controller.

Data Transfer Objects for the workflow web API.

This module defines DTOs for serializing and deserializing workflow data in REST API requests and responses.

class litestar_workflows.web.dto.CompleteTaskDTO[source]

Bases: object

DTO for completing a human task.

output_data

Data submitted by the user completing the task.

completed_by

User ID who completed the task.

comment

Optional comment about the completion.

Parameters:
output_data: dict[str, Any]
completed_by: str
comment: str | None = None
__init__(output_data, completed_by, comment=None)
Parameters:
class litestar_workflows.web.dto.GraphDTO[source]

Bases: object

DTO for workflow graph visualization.

mermaid_source

MermaidJS graph definition.

nodes

List of node definitions.

edges

List of edge definitions.

Parameters:
mermaid_source: str
nodes: list[dict[str, Any]]
edges: list[dict[str, Any]]
__init__(mermaid_source, nodes, edges)
Parameters:
class litestar_workflows.web.dto.HumanTaskDTO[source]

Bases: object

DTO for human task summary.

id

Task ID.

instance_id

Workflow instance ID.

step_name

Name of the human task step.

title

Display title for the task.

description

Detailed task description.

assignee

User ID assigned to complete the task.

status

Task status (pending, completed, canceled).

due_date

Optional due date for task completion.

created_at

When the task was created.

form_schema

Optional JSON Schema for task form.

Parameters:
id: UUID
instance_id: UUID
step_name: str
title: str
description: str | None
assignee: str | None
status: str
due_date: datetime | None
created_at: datetime
form_schema: dict[str, Any] | None = None
__init__(id, instance_id, step_name, title, description, assignee, status, due_date, created_at, form_schema=None)
Parameters:
class litestar_workflows.web.dto.ReassignTaskDTO[source]

Bases: object

DTO for reassigning a human task.

new_assignee

User ID to assign the task to.

reason

Optional reason for reassignment.

Parameters:
new_assignee: str
reason: str | None = None
__init__(new_assignee, reason=None)
Parameters:
class litestar_workflows.web.dto.StartWorkflowDTO[source]

Bases: object

DTO for starting a new workflow instance.

definition_name

Name of the workflow definition to instantiate.

input_data

Initial data to pass to the workflow context.

correlation_id

Optional correlation ID for tracking related workflows.

user_id

Optional user ID who started the workflow.

tenant_id

Optional tenant ID for multi-tenancy.

Parameters:
definition_name: str
input_data: dict[str, Any] | None = None
correlation_id: str | None = None
user_id: str | None = None
tenant_id: str | None = None
__init__(definition_name, input_data=None, correlation_id=None, user_id=None, tenant_id=None)
Parameters:
class litestar_workflows.web.dto.StepExecutionDTO[source]

Bases: object

DTO for step execution record.

id

Step execution ID.

step_name

Name of the executed step.

status

Execution status (PENDING, RUNNING, SUCCEEDED, etc.).

started_at

When execution started.

completed_at

When execution completed (if finished).

error

Error message if execution failed.

Parameters:
id: UUID
step_name: str
status: str
started_at: datetime
completed_at: datetime | None = None
error: str | None = None
__init__(id, step_name, status, started_at, completed_at=None, error=None)
Parameters:
class litestar_workflows.web.dto.WorkflowDefinitionDTO[source]

Bases: object

DTO for workflow definition metadata.

name

Workflow name.

version

Workflow version.

description

Human-readable description.

steps

List of step names in the workflow.

edges

List of edge definitions (source, target, condition).

initial_step

Name of the starting step.

terminal_steps

List of terminal step names.

Parameters:
name: str
version: str
description: str
steps: list[str]
edges: list[dict[str, Any]]
initial_step: str
terminal_steps: list[str]
__init__(name, version, description, steps, edges, initial_step, terminal_steps)
Parameters:
class litestar_workflows.web.dto.WorkflowInstanceDTO[source]

Bases: object

DTO for workflow instance summary.

id

Instance ID.

definition_name

Name of the workflow definition.

status

Current execution status.

current_step

Currently executing step (if applicable).

started_at

When the workflow started.

completed_at

When the workflow completed (if finished).

created_by

User who started the workflow.

Parameters:
id: UUID
definition_name: str
status: str
current_step: str | None
started_at: datetime
completed_at: datetime | None = None
created_by: str | None = None
__init__(id, definition_name, status, current_step, started_at, completed_at=None, created_by=None)
Parameters:
class litestar_workflows.web.dto.WorkflowInstanceDetailDTO[source]

Bases: object

DTO for detailed workflow instance information.

Extends WorkflowInstanceDTO with full context and execution history.

id

Instance ID.

definition_name

Name of the workflow definition.

status

Current execution status.

current_step

Currently executing step (if applicable).

started_at

When the workflow started.

completed_at

When the workflow completed (if finished).

created_by

User who started the workflow.

context_data

Current workflow context data.

metadata

Workflow metadata.

step_history

List of step executions.

error

Error message if workflow failed.

Parameters:
id: UUID
definition_name: str
status: str
current_step: str | None
started_at: datetime
completed_at: datetime | None
created_by: str | None
context_data: dict[str, Any]
metadata: dict[str, Any]
step_history: list[StepExecutionDTO]
error: str | None = None
__init__(id, definition_name, status, current_step, started_at, completed_at, created_by, context_data, metadata, step_history, error=None)
Parameters:

Graph visualization utilities for workflows.

This module provides utilities for generating visual representations of workflow graphs, primarily using MermaidJS format.

litestar_workflows.web.graph.generate_mermaid_graph(definition)[source]

Generate a MermaidJS graph representation of a workflow definition.

This function creates a Mermaid flowchart diagram that visualizes the workflow’s steps and transitions. Different step types are represented with different node shapes.

Parameters:

definition (WorkflowDefinition) – The workflow definition to visualize.

Return type:

str

Returns:

A MermaidJS flowchart definition as a string.

Example

>>> mermaid = generate_mermaid_graph(definition)
>>> print(mermaid)
graph TD
    submit[Submit]
    review{{Review}}
    approve[Approve]
    submit --> review
    review --> approve
litestar_workflows.web.graph.parse_graph_to_dict(definition)[source]

Parse a workflow definition into a dictionary representation.

This function extracts nodes and edges from a workflow definition into a structured dictionary format suitable for JSON serialization.

Parameters:

definition (WorkflowDefinition) – The workflow definition to parse.

Return type:

dict[str, Any]

Returns:

A dictionary containing nodes and edges lists.

Example

>>> graph_dict = parse_graph_to_dict(definition)
>>> print(graph_dict["nodes"])
[{"id": "submit", "label": "Submit", "type": "machine"}, ...]

Web API

from litestar_workflows import WorkflowPlugin, WorkflowPluginConfig

# REST API is enabled by default
app = Litestar(
    plugins=[
        WorkflowPlugin(
            config=WorkflowPluginConfig(
                enable_api=True,  # Default
                api_path_prefix="/workflows",
            )
        ),
    ],
)

# For advanced usage, DTOs and utilities are available:
from litestar_workflows.web import (
    WorkflowWebConfig,
    WorkflowDefinitionController,
    WorkflowInstanceController,
    HumanTaskController,
    generate_mermaid_graph,
)

Contrib Modules

Optional integration modules for distributed execution (stub implementations for Phase 6).

Optional integrations for litestar-workflows.

This module contains optional execution engine implementations for distributed task queue systems. Each integration requires installing the corresponding extra:

  • celery: Celery distributed task queue (pip install litestar-workflows[celery])

  • saq: Simple Async Queue for Redis (pip install litestar-workflows[saq])

  • arq: Async Redis Queue (pip install litestar-workflows[arq])

These engines extend the base ExecutionEngine protocol to enable distributed workflow execution across multiple workers.

Example

# When celery extra is installed
from litestar_workflows.contrib.celery import CeleryExecutionEngine

engine = CeleryExecutionEngine(celery_app=app, registry=registry)
await engine.start_workflow(MyWorkflow, initial_data={"key": "value"})

Note

These integrations are planned for Phase 6 (v0.7.0) and are currently stub implementations. See PLAN.md for the implementation roadmap.

Celery execution engine integration.

This module provides a Celery-based execution engine for distributed workflow execution. Celery is a distributed task queue that supports multiple message brokers (Redis, RabbitMQ, etc.).

Installation:
pip install litestar-workflows[celery]

Note

This integration is planned for Phase 6 (v0.7.0) and is currently a stub. See PLAN.md for the implementation roadmap.

Example (planned API):
from celery import Celery
from litestar_workflows.contrib.celery import CeleryExecutionEngine

celery_app = Celery("workflows", broker="redis://localhost:6379/0")
engine = CeleryExecutionEngine(
    celery_app=celery_app,
    registry=workflow_registry,
    persistence=workflow_persistence,
)

# Steps execute as Celery tasks
instance = await engine.start_workflow(ApprovalWorkflow, initial_data={...})
class litestar_workflows.contrib.celery.CeleryExecutionEngine[source]

Bases: object

Celery-based distributed execution engine (stub).

This engine delegates step execution to Celery workers, enabling horizontal scaling of workflow execution across multiple machines.

Note

This is a stub implementation. The actual implementation is planned for Phase 6 (v0.7.0).

Raises:

NotImplementedError – Always raised as this is a stub.

Parameters:
__init__(*args, **kwargs)[source]

Initialize CeleryExecutionEngine.

Raises:

NotImplementedError – This is a stub implementation.

Parameters:

SAQ (Simple Async Queue) execution engine integration.

This module provides a SAQ-based execution engine for distributed workflow execution. SAQ is an async-native job queue built on Redis with a simple API.

Installation:
pip install litestar-workflows[saq]

Note

This integration is planned for Phase 6 (v0.7.0) and is currently a stub. See PLAN.md for the implementation roadmap.

Example (planned API):
from saq import Queue
from litestar_workflows.contrib.saq import SAQExecutionEngine

queue = Queue.from_url("redis://localhost:6379/0")
engine = SAQExecutionEngine(
    queue=queue,
    registry=workflow_registry,
    persistence=workflow_persistence,
)

# Steps execute as SAQ jobs
instance = await engine.start_workflow(ApprovalWorkflow, initial_data={...})

See also

class litestar_workflows.contrib.saq.SAQExecutionEngine[source]

Bases: object

SAQ-based distributed execution engine (stub).

This engine delegates step execution to SAQ workers, enabling async-native distributed workflow execution with Redis.

Note

This is a stub implementation. The actual implementation is planned for Phase 6 (v0.7.0).

Raises:

NotImplementedError – Always raised as this is a stub.

Parameters:
__init__(*args, **kwargs)[source]

Initialize SAQExecutionEngine.

Raises:

NotImplementedError – This is a stub implementation.

Parameters:

ARQ (Async Redis Queue) execution engine integration.

This module provides an ARQ-based execution engine for distributed workflow execution. ARQ is a fast async job queue built on Redis with type hints.

Installation:
pip install litestar-workflows[arq]

Note

This integration is planned for Phase 6 (v0.7.0) and is currently a stub. See PLAN.md for the implementation roadmap.

Example (planned API):
from arq import create_pool
from arq.connections import RedisSettings
from litestar_workflows.contrib.arq import ARQExecutionEngine

redis = await create_pool(RedisSettings())
engine = ARQExecutionEngine(
    redis_pool=redis,
    registry=workflow_registry,
    persistence=workflow_persistence,
)

# Steps execute as ARQ jobs
instance = await engine.start_workflow(ApprovalWorkflow, initial_data={...})

See also

class litestar_workflows.contrib.arq.ARQExecutionEngine[source]

Bases: object

ARQ-based distributed execution engine (stub).

This engine delegates step execution to ARQ workers, enabling async-native distributed workflow execution with Redis and full type hint support.

Note

This is a stub implementation. The actual implementation is planned for Phase 6 (v0.7.0).

Raises:

NotImplementedError – Always raised as this is a stub.

Parameters:
__init__(*args, **kwargs)[source]

Initialize ARQExecutionEngine.

Raises:

NotImplementedError – This is a stub implementation.

Parameters:

Note

The contrib engines are stub implementations. Full implementations are planned for Phase 6 (v0.7.0).

Contrib API

# Celery execution engine
from litestar_workflows.contrib.celery import CeleryExecutionEngine

# SAQ execution engine
from litestar_workflows.contrib.saq import SAQExecutionEngine

# ARQ execution engine
from litestar_workflows.contrib.arq import ARQExecutionEngine