API Reference

Complete API documentation for litestar-workflows.

Core API

The core API provides the fundamental building blocks for workflows.

Workflow Definition

class WorkflowDefinition

Declarative workflow structure defining steps and their connections.

Parameters:
  • name (str) – Unique identifier for the workflow

  • version (str) – Semantic version string (e.g., “1.0.0”)

  • description (str) – Human-readable description

  • steps (dict[str, Step]) – Dictionary mapping step names to step instances

  • edges (list[Edge]) – List of Edge objects defining transitions

  • initial_step (str) – Name of the first step to execute

  • terminal_steps (set[str]) – Set of step names that end the workflow

Example:

definition = WorkflowDefinition(
    name="approval",
    version="1.0.0",
    description="Simple approval workflow",
    steps={"submit": SubmitStep(), "review": ReviewStep()},
    edges=[Edge("submit", "review")],
    initial_step="submit",
    terminal_steps={"review"},
)
get_graph() WorkflowGraph

Build and return the graph representation of the workflow.

to_mermaid() str

Generate a MermaidJS diagram of the workflow.

validate() list[str]

Validate the workflow definition and return any errors.

class Edge

Defines a transition between steps.

Parameters:
  • source (str) – The step to transition from

  • target (str) – The step to transition to

  • condition (str | None) – Optional condition expression for this edge

Example:

# Simple edge
Edge("step_a", "step_b")

# Conditional edge
Edge("review", "approve", condition="context.get('score') >= 80")

Workflow Context

class WorkflowContext

Execution context passed between steps.

Parameters:
  • workflow_id (UUID) – UUID of the workflow definition

  • instance_id (UUID) – UUID of this specific execution

  • data (dict[str, Any]) – Mutable workflow data dictionary

  • metadata (dict[str, Any]) – Immutable metadata dictionary

  • current_step (str) – Name of the currently executing step

  • step_history (list[StepExecution]) – List of completed step executions

  • started_at (datetime) – Workflow start timestamp

  • user_id (str | None) – Current user ID (for human tasks)

  • tenant_id (str | None) – Tenant ID for multi-tenancy

get(key: str, default: Any = None) Any

Get a value from the data dictionary.

Parameters:
  • key – The key to look up

  • default – Default value if key not found

Returns:

The value or default

set(key: str, value: Any) None

Set a value in the data dictionary.

Parameters:
  • key – The key to set

  • value – The value to store

Step Types

class Step

Protocol defining a workflow step.

Parameters:
  • name (str) – Unique identifier for the step

  • description (str) – Human-readable description

async execute(context: WorkflowContext) Any

Execute the step with the given context.

async can_execute(context: WorkflowContext) bool

Check if step can execute (guards/validators).

async on_success(context: WorkflowContext, result: Any) None

Hook called after successful execution.

async on_failure(context: WorkflowContext, error: Exception) None

Hook called after failed execution.

class BaseMachineStep

Base class for automated (machine) steps.

Inherits from Step with step_type = StepType.MACHINE.

Example:

class ProcessData(BaseMachineStep):
    name = "process_data"
    description = "Process incoming data"

    async def execute(self, context: WorkflowContext) -> None:
        data = context.get("input")
        result = transform(data)
        context.set("output", result)
class BaseHumanStep

Base class for human interaction steps.

Parameters:
  • title (str) – Display title for the task

  • form_schema (dict[str, Any]) – JSON Schema for the input form

Example:

class ApprovalStep(BaseHumanStep):
    name = "approval"
    title = "Approval Required"
    form_schema = {
        "type": "object",
        "properties": {
            "approved": {"type": "boolean"},
        },
    }
class BaseGateway

Base class for decision/branching steps.

async evaluate(context: WorkflowContext) str | list[str]

Evaluate conditions and return the next step name(s).

Step Groups

class SequentialGroup

Execute steps in sequence, passing results.

Parameters:

steps (tuple[Step | StepGroup, ...]) – Steps to execute in order

Example:

validation = SequentialGroup(
    ValidateFormat(),
    ValidateContent(),
    ValidatePermissions(),
)
class ParallelGroup

Execute steps in parallel.

Parameters:
  • steps (tuple[Step | StepGroup, ...]) – Steps to execute simultaneously

  • callback (Step | None) – Optional step to run after all complete

Example:

notifications = ParallelGroup(
    SendEmail(),
    SendSlack(),
    callback=LogNotifications(),
)

Enums

class StepType

Classification of step types.

MACHINE = "machine"

Automated execution

HUMAN = "human"

Requires user interaction

WEBHOOK = "webhook"

Waits for external callback

TIMER = "timer"

Waits for time condition

GATEWAY = "gateway"

Decision/branching point

class StepStatus

Step execution status.

PENDING
SCHEDULED
RUNNING
WAITING
SUCCEEDED
FAILED
CANCELED
SKIPPED
class WorkflowStatus

Workflow instance status.

PENDING
RUNNING
WAITING
COMPLETED
FAILED
CANCELED

Execution Engine

class ExecutionEngine

Protocol for workflow execution engines.

async start_workflow(workflow_name: str, initial_data: dict = None, version: str = None) WorkflowInstance

Start a new workflow instance.

Parameters:
  • workflow_name – Name of the workflow to start

  • initial_data – Initial data for the workflow context

  • version – Specific version to use (latest if None)

Returns:

The created workflow instance

async execute_step(step: Step, context: WorkflowContext, previous_result: Any = None) Any

Execute a single step.

async complete_human_task(instance_id: UUID, step_name: str, user_id: str, data: dict) None

Complete a human task with user input.

Parameters:
  • instance_id – The workflow instance ID

  • step_name – Name of the human step

  • user_id – ID of the user completing the task

  • data – Form data submitted by the user

async cancel_workflow(instance_id: UUID, reason: str) None

Cancel a running workflow.

async retry(instance_id: UUID, from_step: str = None) None

Retry a failed workflow from a specific step.

class LocalExecutionEngine

In-process async execution engine.

Parameters:
  • registry (WorkflowRegistry) – Workflow registry containing definitions

  • persistence (WorkflowPersistence | None) – Optional persistence layer

  • event_bus (EventBus | None) – Optional event bus for notifications

Example:

registry = WorkflowRegistry()
registry.register_definition(my_workflow)

engine = LocalExecutionEngine(registry)
instance = await engine.start_workflow("my_workflow")

Registry

class WorkflowRegistry

Registry for managing workflow definitions.

register_definition(definition: WorkflowDefinition) None

Register a workflow definition.

get_definition(name: str, version: str = None) WorkflowDefinition

Get a workflow definition by name and optional version.

list_definitions(active_only: bool = True) list[WorkflowDefinition]

List all registered workflow definitions.

has_definition(name: str) bool

Check if a workflow definition exists.

Exceptions

exception WorkflowsError

Base exception for all litestar-workflows errors.

exception WorkflowNotFoundError

Raised when a requested workflow definition is not found.

exception StepExecutionError

Raised when a step fails during execution.

exception InvalidTransitionError

Raised when an invalid state transition is attempted.

exception ValidationError

Raised when workflow or step validation fails.

Database Models (Optional)

Available with the [db] extra. Install with:

pip install litestar-workflows[db]

PersistentExecutionEngine

class PersistentExecutionEngine

Execution engine with database persistence.

A drop-in replacement for LocalExecutionEngine that persists all workflow state to a database, enabling durability, recovery, and querying.

Parameters:
  • registry (WorkflowRegistry) – The workflow registry containing definitions

  • session (AsyncSession) – SQLAlchemy async session for database operations

  • event_bus (Any | None) – Optional event bus for emitting workflow events

Example:

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from litestar_workflows import WorkflowRegistry
from litestar_workflows.db import PersistentExecutionEngine

engine = create_async_engine("postgresql+asyncpg://localhost/db")
session_factory = async_sessionmaker(engine, expire_on_commit=False)

registry = WorkflowRegistry()
registry.register_definition(MyWorkflow.get_definition())
registry.register_workflow_class(MyWorkflow)

async with session_factory() as session:
    engine = PersistentExecutionEngine(registry=registry, session=session)
    instance = await engine.start_workflow(MyWorkflow, initial_data={"key": "value"})
async start_workflow(workflow, initial_data=None, *, tenant_id=None, created_by=None) WorkflowInstanceData

Start a new workflow instance with persistence.

Parameters:
  • workflow (type[Workflow]) – The workflow class to execute

  • initial_data (dict[str, Any] | None) – Optional initial data for the workflow context

  • tenant_id (str | None) – Optional tenant ID for multi-tenancy support

  • created_by (str | None) – Optional user ID who started the workflow

Returns:

The created workflow instance data

async complete_human_task(instance_id, step_name, user_id, data) None

Complete a human task with user-provided data.

Parameters:
  • instance_id (UUID) – The workflow instance ID

  • step_name (str) – Name of the human task step

  • user_id (str) – ID of the user completing the task

  • data (dict[str, Any]) – User-provided data to merge into context

Raises:

ValueError – If instance not found or not in WAITING status

async cancel_workflow(instance_id, reason) None

Cancel a running workflow.

Parameters:
  • instance_id (UUID) – The workflow instance ID

  • reason (str) – Reason for cancellation

async get_instance(instance_id) WorkflowInstanceData

Get a workflow instance by ID.

Parameters:

instance_id (UUID) – The workflow instance ID

Returns:

The workflow instance data

Raises:

KeyError – If instance not found

get_running_instances() list[UUID]

Get IDs of currently running instances (in-memory tracking).

Returns:

List of running instance IDs

SQLAlchemy Models

class WorkflowDefinitionModel

SQLAlchemy model for persisted workflow definitions.

Stores the serialized workflow definition along with metadata for querying and managing workflow versions.

Parameters:
  • name (str) – Unique name identifier for the workflow (indexed)

  • version (str) – Semantic version string (e.g., “1.0.0”)

  • description (str | None) – Human-readable description

  • definition_json (dict[str, Any]) – Serialized WorkflowDefinition as JSON

  • is_active (bool) – Whether this version is active for new instances

  • instances (list[WorkflowInstanceModel]) – Related workflow instances (relationship)

class WorkflowInstanceModel

SQLAlchemy model for workflow instances.

Stores the current state of a workflow execution including context data, current step, and execution history.

Parameters:
  • definition_id (UUID) – Foreign key to the workflow definition

  • workflow_name (str) – Denormalized workflow name for quick queries

  • workflow_version (str) – Denormalized workflow version

  • status (WorkflowStatus) – Current execution status

  • current_step (str | None) – Name of the currently executing step

  • context_data (dict[str, Any]) – Mutable workflow context data as JSON

  • metadata (dict[str, Any]) – Immutable metadata about the execution

  • error (str | None) – Error message if workflow failed

  • started_at (datetime) – Timestamp when execution began

  • completed_at (datetime | None) – Timestamp when execution finished

  • tenant_id (str | None) – Optional tenant identifier for multi-tenancy (indexed)

  • created_by (str | None) – Optional user who started the workflow (indexed)

class StepExecutionModel

SQLAlchemy model for step execution records.

Tracks the execution of each step including timing, status, and input/output data for debugging and audit purposes.

Parameters:
  • instance_id (UUID) – Foreign key to the workflow instance

  • step_name (str) – Name of the executed step (indexed)

  • step_type (StepType) – Type of step (MACHINE, HUMAN, etc.)

  • status (StepStatus) – Execution status of the step (indexed)

  • input_data (dict[str, Any] | None) – Input data passed to the step

  • output_data (dict[str, Any] | None) – Output data produced by the step

  • error (str | None) – Error message if step failed

  • started_at (datetime) – Timestamp when step execution began

  • completed_at (datetime | None) – Timestamp when step execution finished

  • assigned_to (str | None) – User ID assigned to human tasks

  • completed_by (str | None) – User ID who completed human tasks

class HumanTaskModel

SQLAlchemy model for pending human tasks.

Provides a denormalized view of pending human approval tasks for efficient querying by assignee, due date, and status.

Parameters:
  • instance_id (UUID) – Foreign key to the workflow instance

  • step_execution_id (UUID) – Foreign key to the step execution

  • step_name (str) – Name of the human task step

  • title (str) – Display title for the task

  • description (str | None) – Detailed description of the task

  • form_schema (dict[str, Any] | None) – JSON Schema defining the task form

  • assignee_id (str | None) – User ID assigned to complete the task (indexed)

  • assignee_group (str | None) – Group/role that can complete the task (indexed)

  • due_at (datetime | None) – Deadline for task completion (indexed)

  • reminder_at (datetime | None) – When to send a reminder

  • status (str) – Current task status (pending, completed, canceled)

  • completed_at (datetime | None) – When the task was completed

  • completed_by (str | None) – User who completed the task

Repository Classes

class WorkflowDefinitionRepository

Repository for workflow definition CRUD operations.

Parameters:

session (AsyncSession) – SQLAlchemy async session

async get_by_name(name, version=None, *, active_only=True) WorkflowDefinitionModel | None

Get a workflow definition by name and optional version.

Parameters:
  • name – The workflow name

  • version – Optional specific version. If None, returns the latest active version.

  • active_only – If True, only return active definitions

Returns:

The workflow definition or None

async get_latest_version(name) WorkflowDefinitionModel | None

Get the latest active version of a workflow definition.

async list_active() Sequence[WorkflowDefinitionModel]

List all active workflow definitions.

async deactivate_version(name, version) bool

Deactivate a specific workflow version.

class WorkflowInstanceRepository

Repository for workflow instance CRUD operations.

Parameters:

session (AsyncSession) – SQLAlchemy async session

async find_by_workflow(workflow_name, status=None, limit=100, offset=0) tuple[Sequence[WorkflowInstanceModel], int]

Find instances by workflow name with optional status filter.

Returns:

Tuple of (instances, total_count)

async find_by_user(user_id, status=None) Sequence[WorkflowInstanceModel]

Find instances created by a specific user.

async find_by_tenant(tenant_id, status=None, limit=100, offset=0) tuple[Sequence[WorkflowInstanceModel], int]

Find instances by tenant ID for multi-tenancy support.

async find_running() Sequence[WorkflowInstanceModel]

Find all running or waiting workflow instances.

async update_status(instance_id, status, *, current_step=None, error=None) WorkflowInstanceModel | None

Update the status of a workflow instance.

class StepExecutionRepository

Repository for step execution record CRUD operations.

Parameters:

session (AsyncSession) – SQLAlchemy async session

async find_by_instance(instance_id) Sequence[StepExecutionModel]

Find all step executions for an instance, ordered by start time.

async find_by_step_name(instance_id, step_name) StepExecutionModel | None

Find the most recent execution record for a specific step.

async find_failed(instance_id=None) Sequence[StepExecutionModel]

Find failed step executions, optionally filtered by instance.

class HumanTaskRepository

Repository for human task CRUD operations.

Parameters:

session (AsyncSession) – SQLAlchemy async session

async find_pending(assignee_id=None, assignee_group=None) Sequence[HumanTaskModel]

Find pending human tasks, optionally filtered by assignee or group.

Parameters:
  • assignee_id – If provided, includes tasks assigned to this user or unassigned

  • assignee_group – If provided, includes tasks for this group or unassigned

async find_by_instance(instance_id) Sequence[HumanTaskModel]

Find all human tasks for an instance.

async find_overdue() Sequence[HumanTaskModel]

Find overdue pending human tasks (due_at < now).

async complete_task(task_id, completed_by) HumanTaskModel | None

Mark a human task as completed.

Parameters:
  • task_id – The task ID

  • completed_by – User ID who completed the task

async cancel_task(task_id) HumanTaskModel | None

Cancel a pending human task

Web Plugin (Optional)

Available with the [web] extra. The REST API is built into the main WorkflowPlugin and enabled by default via enable_api=True.

Example:

from litestar import Litestar
from litestar_workflows import WorkflowPlugin, WorkflowPluginConfig

app = Litestar(
    plugins=[
        WorkflowPlugin(
            config=WorkflowPluginConfig(
                enable_api=True,  # Default - API auto-enabled
                api_path_prefix="/workflows",
                api_guards=[],
            )
        ),
    ],
)

See Web Plugin API Reference for complete API reference.

Contrib Modules (Planned)

Optional execution engines for distributed workflow execution.

Note

These are stub implementations for Phase 6 (v0.7.0). Installing the extras provides the stub classes which raise NotImplementedError.

class CeleryExecutionEngine

Celery-based distributed execution engine (stub).

Install with: pip install litestar-workflows[celery]

class SAQExecutionEngine

SAQ (Simple Async Queue) execution engine (stub).

Install with: pip install litestar-workflows[saq]

class ARQExecutionEngine

ARQ (Async Redis Queue) execution engine (stub).

Install with: pip install litestar-workflows[arq]

Full Module Reference

See Also