Execution¶
The execution engine is responsible for running workflow instances. It schedules steps, manages state transitions, and handles both automated and human-driven execution.
Execution Engines¶
litestar-workflows provides multiple execution engine implementations:
Engine |
Description |
|---|---|
|
In-process async execution (default, in-memory) |
|
SQLAlchemy-backed persistent execution ( |
|
Distributed via Celery (optional extra, planned) |
|
Redis-backed async queues (optional extra, planned) |
The Local Engine¶
The LocalExecutionEngine runs workflows in the current process:
from litestar_workflows import LocalExecutionEngine, WorkflowRegistry
# Create registry with workflow definitions
registry = WorkflowRegistry()
registry.register_definition(my_workflow)
# Create engine
engine = LocalExecutionEngine(registry)
# Start a workflow
instance = await engine.start_workflow(
"my_workflow",
initial_data={"key": "value"}
)
Engine Configuration¶
engine = LocalExecutionEngine(
registry=registry,
persistence=database_repository, # Optional: persist state
event_bus=event_bus, # Optional: emit events
)
Execution Flow¶
Here’s how the engine executes a workflow:
1. Start Workflow
|
v
2. Create Instance + Context
|
v
3. Begin at initial_step
|
v
+---> 4. Check step type
| |
| +-- MACHINE: Execute immediately
| | |
| | v
| | on_success/on_failure
| |
| +-- HUMAN: Create task, PAUSE
| | Wait for completion...
| |
| +-- GATEWAY: Evaluate condition
| |
| +-- TIMER: Schedule future execution
|
5. Find next step(s) via edges
|
+-- Has next: Return to step 4
|
+-- No next (terminal): Complete workflow
Starting Workflows¶
# Basic start
instance = await engine.start_workflow("workflow_name")
# With initial data
instance = await engine.start_workflow(
"workflow_name",
initial_data={
"customer_id": "cust-123",
"amount": 500.00,
}
)
# With specific version
instance = await engine.start_workflow(
"workflow_name",
version="2.0.0",
initial_data={...}
)
The returned instance contains:
instance.id # Unique instance identifier
instance.status # Current status (RUNNING, WAITING, etc.)
instance.current_step # Name of current step
instance.context # WorkflowContext with data
Human Task Completion¶
When a workflow reaches a human step, it pauses and waits for input:
# Workflow is now WAITING at "manager_approval"
assert instance.status == WorkflowStatus.WAITING
assert instance.current_step == "manager_approval"
# Complete the human task
await engine.complete_human_task(
instance_id=instance.id,
step_name="manager_approval",
user_id="manager@example.com",
data={
"approved": True,
"comments": "Approved for Q4 budget",
}
)
# Workflow continues execution
Cancellation¶
Cancel a running or waiting workflow:
await engine.cancel_workflow(
instance_id=instance.id,
reason="Customer requested cancellation"
)
# Instance is now CANCELED
assert instance.status == WorkflowStatus.CANCELED
Retry Failed Workflows¶
Retry a failed workflow from a specific step:
# Workflow failed at "process_payment"
assert instance.status == WorkflowStatus.FAILED
# Retry from the failed step
await engine.retry(
instance_id=instance.id,
from_step="process_payment" # Optional: defaults to failed step
)
Parallel Execution¶
When a step has multiple outgoing edges without conditions, they execute in parallel:
definition = WorkflowDefinition(
steps={
"start": StartStep(),
"notify_email": NotifyEmail(),
"notify_slack": NotifySlack(),
"notify_sms": NotifySMS(),
"complete": CompleteStep(),
},
edges=[
Edge("start", "notify_email"),
Edge("start", "notify_slack"),
Edge("start", "notify_sms"),
Edge("notify_email", "complete"),
Edge("notify_slack", "complete"),
Edge("notify_sms", "complete"),
],
...
)
The engine will:
Execute
startExecute
notify_email,notify_slack,notify_smsin parallelWait for all three to complete
Execute
complete
Conditional Execution¶
Edge conditions control which paths are taken:
edges = [
Edge("review", "approve", condition="context.get('score') >= 80"),
Edge("review", "reject", condition="context.get('score') < 80"),
]
The engine evaluates conditions and follows matching edges.
Event Emission¶
Configure an event bus to receive workflow events:
from litestar_workflows import EventBus
class MyEventBus(EventBus):
async def emit(self, event):
print(f"Event: {event}")
await event_store.save(event)
engine = LocalExecutionEngine(
registry=registry,
event_bus=MyEventBus()
)
Events include:
WorkflowStartedWorkflowCompletedWorkflowFailedWorkflowCanceledStepExecutedHumanTaskCreatedHumanTaskCompleted
Persistence¶
For durable workflows, use the PersistentExecutionEngine from the [db] extra:
pip install litestar-workflows[db]
The PersistentExecutionEngine is a drop-in replacement for LocalExecutionEngine
that stores all workflow state in a database:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from litestar_workflows import WorkflowRegistry
from litestar_workflows.db import PersistentExecutionEngine
# Create database engine and session
db_engine = create_async_engine("postgresql+asyncpg://localhost/workflows")
session_factory = async_sessionmaker(db_engine, expire_on_commit=False)
# Create workflow registry
registry = WorkflowRegistry()
registry.register_definition(MyWorkflow.get_definition())
registry.register_workflow_class(MyWorkflow)
# Use persistent engine
async with session_factory() as session:
engine = PersistentExecutionEngine(
registry=registry,
session=session,
)
# Start workflow - state is automatically persisted
instance = await engine.start_workflow(
MyWorkflow,
initial_data={"order_id": "12345"},
tenant_id="acme-corp", # Multi-tenancy support
created_by="user@example.com", # Audit trail
)
With the persistent engine:
Workflow state survives application restarts
Human tasks can wait indefinitely for completion
Complete execution history is preserved for auditing
Workflows can be queried by status, user, tenant, etc.
Failed workflows can be analyzed and retried
The persistence layer creates four database tables:
workflow_definitions- Stores workflow definition metadata with versioningworkflow_instances- Tracks running/completed workflow instancesworkflow_step_executions- Records individual step executionsworkflow_human_tasks- Manages pending human approval tasks
See the Database Persistence guide for complete setup instructions
Distributed Execution¶
For production workloads, use a distributed engine:
Celery Engine¶
from celery import Celery
from litestar_workflows.contrib.celery import CeleryExecutionEngine
celery_app = Celery("workflows", broker="redis://localhost:6379/0")
engine = CeleryExecutionEngine(
celery_app=celery_app,
persistence=persistence,
)
# Steps are executed as Celery tasks
instance = await engine.start_workflow("my_workflow")
SAQ Engine¶
from saq import Queue
from litestar_workflows.contrib.saq import SAQExecutionEngine
queue = Queue.from_url("redis://localhost:6379/0")
engine = SAQExecutionEngine(
queue=queue,
persistence=persistence,
)
Execution Guarantees¶
The execution engine provides these guarantees:
At-Least-Once Execution¶
Steps are guaranteed to execute at least once. With persistence, failed workflows can be retried:
# Step might run multiple times in retry scenarios
class IdempotentStep(BaseMachineStep):
async def execute(self, context):
# Make operations idempotent
if not await payment_exists(context.get("payment_id")):
await create_payment(...)
Order Preservation¶
Steps execute in the order defined by edges. Parallel steps may complete in any order, but the next step waits for all to finish.
Error Isolation¶
A failing step doesn’t corrupt the workflow state. The context is preserved at the point of failure for debugging and retry.
Workflow Registry¶
The WorkflowRegistry manages workflow definitions:
from litestar_workflows import WorkflowRegistry
registry = WorkflowRegistry()
# Register definitions
registry.register_definition(workflow_v1)
registry.register_definition(workflow_v2)
# Get definition
definition = registry.get_definition("my_workflow")
definition = registry.get_definition("my_workflow", version="1.0.0")
# List all definitions
all_workflows = registry.list_definitions()
# Check if workflow exists
if registry.has_definition("my_workflow"):
...
Best Practices¶
Use Persistence in Production¶
Always configure persistence for production workloads:
# Development - in-memory is fine
engine = LocalExecutionEngine(registry)
# Production - use persistence
engine = LocalExecutionEngine(
registry=registry,
persistence=database_persistence,
)
Make Steps Idempotent¶
Design steps to be safely re-executed:
class CreateOrder(BaseMachineStep):
async def execute(self, context):
order_id = context.get("order_id")
# Check if already created
existing = await orders.get(order_id)
if existing:
context.set("order", existing)
return
# Create new order
order = await orders.create(order_id, ...)
context.set("order", order)
Handle Timeouts¶
Long-running steps should handle timeouts gracefully:
import asyncio
class LongRunningStep(BaseMachineStep):
async def execute(self, context):
try:
result = await asyncio.wait_for(
slow_operation(),
timeout=300.0 # 5 minutes
)
context.set("result", result)
except asyncio.TimeoutError:
context.set("timeout", True)
raise
Monitor Execution¶
Use events to track workflow execution:
class MonitoringEventBus(EventBus):
async def emit(self, event):
# Record metrics
metrics.increment(f"workflow.{event.type}")
# Log for debugging
logger.info(f"Workflow event: {event}")
# Alert on failures
if event.type == "WorkflowFailed":
await alerting.notify(f"Workflow failed: {event.instance_id}")
See Also¶
Workflows - Workflow definitions
Steps - Step types and lifecycle
Parallel Execution - Parallel execution patterns