API Reference¶
Complete API documentation for litestar-workflows.
Core API¶
The core API provides the fundamental building blocks for workflows.
Workflow Definition¶
- class WorkflowDefinition¶
Declarative workflow structure defining steps and their connections.
- Parameters:
name (str) – Unique identifier for the workflow
version (str) – Semantic version string (e.g., “1.0.0”)
description (str) – Human-readable description
steps (dict[str, Step]) – Dictionary mapping step names to step instances
edges (list[Edge]) – List of Edge objects defining transitions
initial_step (str) – Name of the first step to execute
terminal_steps (set[str]) – Set of step names that end the workflow
Example:
definition = WorkflowDefinition( name="approval", version="1.0.0", description="Simple approval workflow", steps={"submit": SubmitStep(), "review": ReviewStep()}, edges=[Edge("submit", "review")], initial_step="submit", terminal_steps={"review"}, )
- get_graph() WorkflowGraph¶
Build and return the graph representation of the workflow.
- class Edge¶
Defines a transition between steps.
- Parameters:
Example:
# Simple edge Edge("step_a", "step_b") # Conditional edge Edge("review", "approve", condition="context.get('score') >= 80")
Workflow Context¶
- class WorkflowContext¶
Execution context passed between steps.
- Parameters:
workflow_id (UUID) – UUID of the workflow definition
instance_id (UUID) – UUID of this specific execution
current_step (str) – Name of the currently executing step
step_history (list[StepExecution]) – List of completed step executions
started_at (datetime) – Workflow start timestamp
user_id (str | None) – Current user ID (for human tasks)
tenant_id (str | None) – Tenant ID for multi-tenancy
Step Types¶
- class Step¶
Protocol defining a workflow step.
- Parameters:
- async execute(context: WorkflowContext) Any¶
Execute the step with the given context.
- async can_execute(context: WorkflowContext) bool¶
Check if step can execute (guards/validators).
- async on_success(context: WorkflowContext, result: Any) None¶
Hook called after successful execution.
- async on_failure(context: WorkflowContext, error: Exception) None¶
Hook called after failed execution.
- class BaseMachineStep¶
Base class for automated (machine) steps.
Inherits from
Stepwithstep_type = StepType.MACHINE.Example:
class ProcessData(BaseMachineStep): name = "process_data" description = "Process incoming data" async def execute(self, context: WorkflowContext) -> None: data = context.get("input") result = transform(data) context.set("output", result)
- class BaseHumanStep¶
Base class for human interaction steps.
- Parameters:
Example:
class ApprovalStep(BaseHumanStep): name = "approval" title = "Approval Required" form_schema = { "type": "object", "properties": { "approved": {"type": "boolean"}, }, }
Step Groups¶
- class SequentialGroup¶
Execute steps in sequence, passing results.
Example:
validation = SequentialGroup( ValidateFormat(), ValidateContent(), ValidatePermissions(), )
Enums¶
- class StepType¶
Classification of step types.
- MACHINE = "machine"¶
Automated execution
- HUMAN = "human"¶
Requires user interaction
- WEBHOOK = "webhook"¶
Waits for external callback
- TIMER = "timer"¶
Waits for time condition
- GATEWAY = "gateway"¶
Decision/branching point
Execution Engine¶
- class ExecutionEngine¶
Protocol for workflow execution engines.
- async start_workflow(workflow_name: str, initial_data: dict = None, version: str = None) WorkflowInstance¶
Start a new workflow instance.
- Parameters:
workflow_name – Name of the workflow to start
initial_data – Initial data for the workflow context
version – Specific version to use (latest if None)
- Returns:
The created workflow instance
- async execute_step(step: Step, context: WorkflowContext, previous_result: Any = None) Any¶
Execute a single step.
- async complete_human_task(instance_id: UUID, step_name: str, user_id: str, data: dict) None¶
Complete a human task with user input.
- Parameters:
instance_id – The workflow instance ID
step_name – Name of the human step
user_id – ID of the user completing the task
data – Form data submitted by the user
- class LocalExecutionEngine¶
In-process async execution engine.
- Parameters:
registry (WorkflowRegistry) – Workflow registry containing definitions
persistence (WorkflowPersistence | None) – Optional persistence layer
event_bus (EventBus | None) – Optional event bus for notifications
Example:
registry = WorkflowRegistry() registry.register_definition(my_workflow) engine = LocalExecutionEngine(registry) instance = await engine.start_workflow("my_workflow")
Registry¶
- class WorkflowRegistry¶
Registry for managing workflow definitions.
- register_definition(definition: WorkflowDefinition) None¶
Register a workflow definition.
- get_definition(name: str, version: str = None) WorkflowDefinition¶
Get a workflow definition by name and optional version.
- list_definitions(active_only: bool = True) list[WorkflowDefinition]¶
List all registered workflow definitions.
Exceptions¶
- exception WorkflowsError¶
Base exception for all litestar-workflows errors.
- exception WorkflowNotFoundError¶
Raised when a requested workflow definition is not found.
- exception StepExecutionError¶
Raised when a step fails during execution.
- exception InvalidTransitionError¶
Raised when an invalid state transition is attempted.
- exception ValidationError¶
Raised when workflow or step validation fails.
Database Models (Optional)¶
Available with the [db] extra. Install with:
pip install litestar-workflows[db]
PersistentExecutionEngine¶
- class PersistentExecutionEngine¶
Execution engine with database persistence.
A drop-in replacement for
LocalExecutionEnginethat persists all workflow state to a database, enabling durability, recovery, and querying.- Parameters:
registry (WorkflowRegistry) – The workflow registry containing definitions
session (AsyncSession) – SQLAlchemy async session for database operations
event_bus (Any | None) – Optional event bus for emitting workflow events
Example:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker from litestar_workflows import WorkflowRegistry from litestar_workflows.db import PersistentExecutionEngine engine = create_async_engine("postgresql+asyncpg://localhost/db") session_factory = async_sessionmaker(engine, expire_on_commit=False) registry = WorkflowRegistry() registry.register_definition(MyWorkflow.get_definition()) registry.register_workflow_class(MyWorkflow) async with session_factory() as session: engine = PersistentExecutionEngine(registry=registry, session=session) instance = await engine.start_workflow(MyWorkflow, initial_data={"key": "value"})
- async start_workflow(workflow, initial_data=None, *, tenant_id=None, created_by=None) WorkflowInstanceData¶
Start a new workflow instance with persistence.
- Parameters:
- Returns:
The created workflow instance data
- async complete_human_task(instance_id, step_name, user_id, data) None¶
Complete a human task with user-provided data.
- Parameters:
- Raises:
ValueError – If instance not found or not in WAITING status
- async cancel_workflow(instance_id, reason) None¶
Cancel a running workflow.
- Parameters:
instance_id (UUID) – The workflow instance ID
reason (str) – Reason for cancellation
SQLAlchemy Models¶
- class WorkflowDefinitionModel¶
SQLAlchemy model for persisted workflow definitions.
Stores the serialized workflow definition along with metadata for querying and managing workflow versions.
- Parameters:
name (str) – Unique name identifier for the workflow (indexed)
version (str) – Semantic version string (e.g., “1.0.0”)
description (str | None) – Human-readable description
definition_json (dict[str, Any]) – Serialized WorkflowDefinition as JSON
is_active (bool) – Whether this version is active for new instances
instances (list[WorkflowInstanceModel]) – Related workflow instances (relationship)
- class WorkflowInstanceModel¶
SQLAlchemy model for workflow instances.
Stores the current state of a workflow execution including context data, current step, and execution history.
- Parameters:
definition_id (UUID) – Foreign key to the workflow definition
workflow_name (str) – Denormalized workflow name for quick queries
workflow_version (str) – Denormalized workflow version
status (WorkflowStatus) – Current execution status
current_step (str | None) – Name of the currently executing step
context_data (dict[str, Any]) – Mutable workflow context data as JSON
metadata (dict[str, Any]) – Immutable metadata about the execution
error (str | None) – Error message if workflow failed
started_at (datetime) – Timestamp when execution began
completed_at (datetime | None) – Timestamp when execution finished
tenant_id (str | None) – Optional tenant identifier for multi-tenancy (indexed)
created_by (str | None) – Optional user who started the workflow (indexed)
- class StepExecutionModel¶
SQLAlchemy model for step execution records.
Tracks the execution of each step including timing, status, and input/output data for debugging and audit purposes.
- Parameters:
instance_id (UUID) – Foreign key to the workflow instance
step_name (str) – Name of the executed step (indexed)
step_type (StepType) – Type of step (MACHINE, HUMAN, etc.)
status (StepStatus) – Execution status of the step (indexed)
input_data (dict[str, Any] | None) – Input data passed to the step
output_data (dict[str, Any] | None) – Output data produced by the step
error (str | None) – Error message if step failed
started_at (datetime) – Timestamp when step execution began
completed_at (datetime | None) – Timestamp when step execution finished
assigned_to (str | None) – User ID assigned to human tasks
completed_by (str | None) – User ID who completed human tasks
- class HumanTaskModel¶
SQLAlchemy model for pending human tasks.
Provides a denormalized view of pending human approval tasks for efficient querying by assignee, due date, and status.
- Parameters:
instance_id (UUID) – Foreign key to the workflow instance
step_execution_id (UUID) – Foreign key to the step execution
step_name (str) – Name of the human task step
title (str) – Display title for the task
description (str | None) – Detailed description of the task
form_schema (dict[str, Any] | None) – JSON Schema defining the task form
assignee_id (str | None) – User ID assigned to complete the task (indexed)
assignee_group (str | None) – Group/role that can complete the task (indexed)
due_at (datetime | None) – Deadline for task completion (indexed)
reminder_at (datetime | None) – When to send a reminder
status (str) – Current task status (pending, completed, canceled)
completed_at (datetime | None) – When the task was completed
completed_by (str | None) – User who completed the task
Repository Classes¶
- class WorkflowDefinitionRepository¶
Repository for workflow definition CRUD operations.
- Parameters:
session (AsyncSession) – SQLAlchemy async session
- async get_by_name(name, version=None, *, active_only=True) WorkflowDefinitionModel | None¶
Get a workflow definition by name and optional version.
- Parameters:
name – The workflow name
version – Optional specific version. If None, returns the latest active version.
active_only – If True, only return active definitions
- Returns:
The workflow definition or None
- async get_latest_version(name) WorkflowDefinitionModel | None¶
Get the latest active version of a workflow definition.
- async list_active() Sequence[WorkflowDefinitionModel]¶
List all active workflow definitions.
- class WorkflowInstanceRepository¶
Repository for workflow instance CRUD operations.
- Parameters:
session (AsyncSession) – SQLAlchemy async session
- async find_by_workflow(workflow_name, status=None, limit=100, offset=0) tuple[Sequence[WorkflowInstanceModel], int]¶
Find instances by workflow name with optional status filter.
- Returns:
Tuple of (instances, total_count)
- async find_by_user(user_id, status=None) Sequence[WorkflowInstanceModel]¶
Find instances created by a specific user.
- async find_by_tenant(tenant_id, status=None, limit=100, offset=0) tuple[Sequence[WorkflowInstanceModel], int]¶
Find instances by tenant ID for multi-tenancy support.
- async find_running() Sequence[WorkflowInstanceModel]¶
Find all running or waiting workflow instances.
- async update_status(instance_id, status, *, current_step=None, error=None) WorkflowInstanceModel | None¶
Update the status of a workflow instance.
- class StepExecutionRepository¶
Repository for step execution record CRUD operations.
- Parameters:
session (AsyncSession) – SQLAlchemy async session
- async find_by_instance(instance_id) Sequence[StepExecutionModel]¶
Find all step executions for an instance, ordered by start time.
- async find_by_step_name(instance_id, step_name) StepExecutionModel | None¶
Find the most recent execution record for a specific step.
- async find_failed(instance_id=None) Sequence[StepExecutionModel]¶
Find failed step executions, optionally filtered by instance.
- class HumanTaskRepository¶
Repository for human task CRUD operations.
- Parameters:
session (AsyncSession) – SQLAlchemy async session
- async find_pending(assignee_id=None, assignee_group=None) Sequence[HumanTaskModel]¶
Find pending human tasks, optionally filtered by assignee or group.
- Parameters:
assignee_id – If provided, includes tasks assigned to this user or unassigned
assignee_group – If provided, includes tasks for this group or unassigned
- async find_by_instance(instance_id) Sequence[HumanTaskModel]¶
Find all human tasks for an instance.
- async find_overdue() Sequence[HumanTaskModel]¶
Find overdue pending human tasks (due_at < now).
- async complete_task(task_id, completed_by) HumanTaskModel | None¶
Mark a human task as completed.
- Parameters:
task_id – The task ID
completed_by – User ID who completed the task
- async cancel_task(task_id) HumanTaskModel | None¶
Cancel a pending human task
Web Plugin (Optional)¶
Available with the [web] extra. The REST API is built into the main WorkflowPlugin
and enabled by default via enable_api=True.
Example:
from litestar import Litestar
from litestar_workflows import WorkflowPlugin, WorkflowPluginConfig
app = Litestar(
plugins=[
WorkflowPlugin(
config=WorkflowPluginConfig(
enable_api=True, # Default - API auto-enabled
api_path_prefix="/workflows",
api_guards=[],
)
),
],
)
See Web Plugin API Reference for complete API reference.
Contrib Modules (Planned)¶
Optional execution engines for distributed workflow execution.
Note
These are stub implementations for Phase 6 (v0.7.0). Installing the extras
provides the stub classes which raise NotImplementedError.
- class CeleryExecutionEngine¶
Celery-based distributed execution engine (stub).
Install with:
pip install litestar-workflows[celery]
- class SAQExecutionEngine¶
SAQ (Simple Async Queue) execution engine (stub).
Install with:
pip install litestar-workflows[saq]
- class ARQExecutionEngine¶
ARQ (Async Redis Queue) execution engine (stub).
Install with:
pip install litestar-workflows[arq]
Full Module Reference¶
See Also¶
Core Concepts - Core concepts explained
How-To Guides - Practical how-to guides