Steps¶
Steps are the building blocks of workflows. Each step represents a single unit of work, whether automated or requiring human interaction.
Step Types¶
litestar-workflows provides five step types, each designed for specific use cases:
Type |
Class |
Purpose |
|---|---|---|
MACHINE |
|
Automated execution without human intervention |
HUMAN |
|
Requires user interaction (forms, approvals) |
GATEWAY |
|
Decision points for conditional branching |
TIMER |
|
Delays or scheduled execution |
WEBHOOK |
|
Waits for external callbacks |
Machine Steps¶
Machine steps execute automatically when reached. They’re used for:
Data processing
API calls
Calculations
Notifications
File operations
from litestar_workflows import BaseMachineStep, WorkflowContext
class SendNotification(BaseMachineStep):
"""Send an email notification."""
name = "send_notification"
description = "Notify stakeholders of the request"
async def execute(self, context: WorkflowContext) -> None:
recipient = context.get("requester_email")
status = context.get("status")
await email_service.send(
to=recipient,
subject=f"Request {status}",
body=f"Your request has been {status}.",
)
context.set("notification_sent", True)
Machine Step Lifecycle¶
can_execute: Check if the step should run (guards)
execute: Perform the actual work
on_success: Called after successful execution
on_failure: Called if execution raises an exception
class RobustStep(BaseMachineStep):
name = "robust_step"
async def can_execute(self, context: WorkflowContext) -> bool:
"""Only run if prerequisites are met."""
return context.get("prerequisites_met", False)
async def execute(self, context: WorkflowContext) -> None:
"""Main logic."""
result = await do_work()
context.set("result", result)
async def on_success(self, context: WorkflowContext, result: None) -> None:
"""Log success metrics."""
await metrics.record("step_completed", tags={"step": self.name})
async def on_failure(self, context: WorkflowContext, error: Exception) -> None:
"""Handle failures gracefully."""
await alerting.notify(f"Step failed: {error}")
context.set("error", str(error))
Human Steps¶
Human steps pause execution and wait for user input. They’re used for:
Approvals and rejections
Form submissions
Manual data entry
Decision points requiring human judgment
from litestar_workflows import BaseHumanStep
class DocumentReview(BaseHumanStep):
"""Human reviews and approves a document."""
name = "document_review"
title = "Review Document"
description = "Please review this document and provide your decision"
form_schema = {
"type": "object",
"properties": {
"decision": {
"type": "string",
"title": "Your Decision",
"enum": ["approve", "reject", "revise"],
"enumNames": ["Approve", "Reject", "Request Revisions"],
},
"feedback": {
"type": "string",
"title": "Feedback",
"description": "Comments for the author",
},
"priority": {
"type": "integer",
"title": "Priority Score",
"minimum": 1,
"maximum": 5,
},
},
"required": ["decision"],
}
Form Schema¶
Human steps use JSON Schema to define their forms. This enables:
Automatic form generation in the UI
Input validation
Type safety
Common schema patterns:
# Boolean approval
form_schema = {
"type": "object",
"properties": {
"approved": {"type": "boolean", "title": "Approve?", "default": False},
},
"required": ["approved"],
}
# Selection from options
form_schema = {
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": ["pending", "active", "completed"],
"default": "pending",
},
},
}
# Numeric input with constraints
form_schema = {
"type": "object",
"properties": {
"amount": {
"type": "number",
"minimum": 0,
"maximum": 10000,
"title": "Adjustment Amount",
},
},
}
Task Assignment¶
Human tasks can be assigned to specific users or groups:
class ManagerApproval(BaseHumanStep):
name = "manager_approval"
title = "Manager Approval Required"
# Assignment can be dynamic based on context
async def get_assignee(self, context: WorkflowContext) -> str:
department = context.get("department")
return await org_chart.get_manager(department)
Gateway Steps¶
Gateways are decision points that route execution based on conditions:
from litestar_workflows import BaseGateway, WorkflowContext
class AmountGateway(BaseGateway):
"""Route based on request amount."""
name = "amount_gateway"
description = "Determine approval path based on amount"
async def evaluate(self, context: WorkflowContext) -> str:
"""Return the name of the next step."""
amount = context.get("amount", 0)
if amount > 10000:
return "vp_approval"
elif amount > 1000:
return "manager_approval"
else:
return "auto_approve"
Gateways connect to multiple outgoing edges, and the evaluate method
determines which path to take.
Timer Steps¶
Timer steps introduce delays or schedule execution for later:
from litestar_workflows import TimerStep
from datetime import timedelta
class WaitForBusinessHours(TimerStep):
"""Wait until next business day if outside hours."""
name = "wait_for_business_hours"
async def get_delay(self, context: WorkflowContext) -> timedelta:
now = datetime.now()
if is_business_hours(now):
return timedelta(0) # No delay
return get_next_business_day() - now
Webhook Steps¶
Webhook steps wait for external systems to call back:
from litestar_workflows import WebhookStep
class PaymentCallback(WebhookStep):
"""Wait for payment processor callback."""
name = "payment_callback"
description = "Waiting for payment confirmation"
# Unique token for this webhook
async def get_callback_token(self, context: WorkflowContext) -> str:
return f"payment-{context.instance_id}"
The workflow pauses until an external system calls the webhook endpoint with the matching token.
Step Groups¶
Steps can be composed into groups for more complex patterns:
Sequential Group¶
Execute steps one after another:
from litestar_workflows import SequentialGroup
validation_sequence = SequentialGroup(
ValidateFormat(),
ValidateContent(),
ValidatePermissions(),
)
# Use in workflow definition
steps = {
"validation": validation_sequence,
...
}
Parallel Group¶
Execute steps simultaneously:
from litestar_workflows import ParallelGroup
parallel_notifications = ParallelGroup(
SendEmail(),
SendSlack(),
SendSMS(),
)
All steps in the group run concurrently, and execution continues when all complete.
Parallel with Callback (Chord)¶
Execute steps in parallel, then run a callback with all results:
from litestar_workflows import ParallelGroup
gather_approvals = ParallelGroup(
ManagerApproval(),
LegalReview(),
FinanceReview(),
callback=AggregateDecisions(), # Runs after all complete
)
Creating Custom Steps¶
While the base classes cover most use cases, you can create custom step types:
from litestar_workflows import Step, StepType
class BatchProcessingStep(Step):
"""Custom step for batch operations."""
name: str
step_type = StepType.MACHINE
batch_size: int = 100
async def execute(self, context: WorkflowContext) -> None:
items = context.get("items", [])
for i in range(0, len(items), self.batch_size):
batch = items[i:i + self.batch_size]
await self.process_batch(batch)
context.set("processed_count", len(items))
async def process_batch(self, batch: list) -> None:
"""Override in subclasses."""
raise NotImplementedError
Best Practices¶
Keep Steps Focused¶
Each step should do one thing well:
# Good - single responsibility
class ValidateInput(BaseMachineStep):
async def execute(self, context):
validate(context.get("input"))
class TransformData(BaseMachineStep):
async def execute(self, context):
context.set("output", transform(context.get("input")))
# Avoid - doing too much
class DoEverything(BaseMachineStep):
async def execute(self, context):
validate(context.get("input"))
result = transform(context.get("input"))
await save_to_database(result)
await send_notification(result)
Make Steps Reusable¶
Design steps to be configurable and reusable across workflows:
class SendEmail(BaseMachineStep):
"""Reusable email step."""
name = "send_email"
def __init__(self, template: str, recipient_key: str = "email"):
self.template = template
self.recipient_key = recipient_key
async def execute(self, context: WorkflowContext) -> None:
recipient = context.get(self.recipient_key)
await email_service.send(recipient, template=self.template)
# Use with different configurations
steps = {
"notify_requester": SendEmail("request_received", "requester_email"),
"notify_manager": SendEmail("approval_needed", "manager_email"),
}
Handle Errors Gracefully¶
Use on_failure to handle errors without crashing the workflow:
class ResilientStep(BaseMachineStep):
async def execute(self, context: WorkflowContext) -> None:
# Main logic that might fail
result = await risky_operation()
context.set("result", result)
async def on_failure(self, context: WorkflowContext, error: Exception) -> None:
# Log the error
logger.error(f"Step failed: {error}")
# Set fallback values
context.set("result", None)
context.set("error", str(error))
# Optionally, don't re-raise to allow workflow to continue
# raise # Re-raise to fail the workflow
See Also¶
Workflow Context - How steps share data
Execution - How steps are executed
Working with Human Tasks - Deep dive into human tasks