Conditional Logic and Gateways¶
This guide covers adding decision points and conditional branching to your workflows. You’ll learn how to use gateways, conditional edges, and dynamic routing.
Goal¶
Create workflows that take different paths based on data, user decisions, or business rules.
Understanding Gateways¶
Gateways are decision points that evaluate conditions and route execution:
[Submit] --> [Gateway] ---(amount < 1000)---> [Auto Approve]
|
+---(amount >= 1000)---> [Manager Approval]
There are several types of gateways:
Exclusive (XOR): One path is taken based on condition
Inclusive (OR): One or more paths may be taken
Parallel (AND): All paths are taken (covered in parallel guide)
Basic Gateway¶
Create a simple decision gateway:
from litestar_workflows import BaseGateway, WorkflowContext
class AmountGateway(BaseGateway):
"""Route based on request amount."""
name = "amount_gateway"
description = "Determine approval path"
async def evaluate(self, context: WorkflowContext) -> str:
"""Return the name of the next step."""
amount = context.get("amount", 0)
if amount >= 10000:
return "executive_approval"
elif amount >= 1000:
return "manager_approval"
else:
return "auto_approve"
Using the Gateway in a Workflow¶
from litestar_workflows import WorkflowDefinition, Edge
expense_workflow = WorkflowDefinition(
name="expense_approval",
version="1.0.0",
steps={
"submit": SubmitRequest(),
"route": AmountGateway(),
"auto_approve": AutoApprove(),
"manager_approval": ManagerApproval(),
"executive_approval": ExecutiveApproval(),
"process": ProcessResult(),
},
edges=[
Edge("submit", "route"),
# Gateway edges - one will be taken
Edge("route", "auto_approve"),
Edge("route", "manager_approval"),
Edge("route", "executive_approval"),
# All paths converge
Edge("auto_approve", "process"),
Edge("manager_approval", "process"),
Edge("executive_approval", "process"),
],
initial_step="submit",
terminal_steps={"process"},
)
Conditional Edges¶
Instead of gateways, use conditional edges for simpler logic:
edges = [
Edge("submit", "review"),
# Conditional edges - condition is a Python expression
Edge(
source="review",
target="approve",
condition="context.get('approved') == True"
),
Edge(
source="review",
target="reject",
condition="context.get('approved') == False"
),
]
The condition is evaluated with access to context.
Complex Conditions¶
For complex logic, use gateway steps:
class ComplexRoutingGateway(BaseGateway):
name = "complex_router"
async def evaluate(self, context: WorkflowContext) -> str:
amount = context.get("amount", 0)
department = context.get("department")
is_urgent = context.get("is_urgent", False)
requester_level = context.get("requester_level", 0)
# Complex business rules
if is_urgent and requester_level >= 5:
return "fast_track"
if department == "engineering":
if amount > 5000:
return "cto_approval"
return "eng_manager_approval"
if department == "marketing":
if amount > 3000:
return "cmo_approval"
return "marketing_manager_approval"
# Default path
return "standard_approval"
Multiple Conditions (Inclusive Gateway)¶
When multiple paths should be taken:
class NotificationGateway(BaseGateway):
"""Determine which notifications to send."""
name = "notification_gateway"
gateway_type = "inclusive" # Multiple paths allowed
async def evaluate(self, context: WorkflowContext) -> list[str]:
"""Return list of next steps to execute."""
paths = []
if context.get("notify_email", True):
paths.append("send_email")
if context.get("notify_slack", False):
paths.append("send_slack")
if context.get("amount", 0) > 10000:
paths.append("send_executive_alert")
return paths if paths else ["skip_notifications"]
Conditional Approval Chains¶
Build dynamic approval chains:
class ApprovalChainGateway(BaseGateway):
"""Determine required approvals based on context."""
name = "approval_chain"
async def evaluate(self, context: WorkflowContext) -> str:
amount = context.get("amount", 0)
department = context.get("department")
approval_count = context.get("approval_count", 0)
# Define approval thresholds
thresholds = [
(1000, "team_lead"),
(5000, "manager"),
(15000, "director"),
(50000, "vp"),
(float("inf"), "ceo"),
]
# Find required approval level
required_level = 0
for threshold, _ in thresholds:
if amount <= threshold:
break
required_level += 1
# Check if all required approvals obtained
if approval_count >= required_level:
return "process_approved"
# Route to next approver
return thresholds[approval_count][1]
State Machine Pattern¶
Implement state machine behavior:
from enum import StrEnum
class OrderState(StrEnum):
PENDING = "pending"
PROCESSING = "processing"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
class OrderStateGateway(BaseGateway):
"""Handle order state transitions."""
name = "order_state"
# Valid transitions
TRANSITIONS = {
OrderState.PENDING: [OrderState.PROCESSING, OrderState.CANCELLED],
OrderState.PROCESSING: [OrderState.SHIPPED, OrderState.CANCELLED],
OrderState.SHIPPED: [OrderState.DELIVERED],
OrderState.DELIVERED: [],
OrderState.CANCELLED: [],
}
async def evaluate(self, context: WorkflowContext) -> str:
current_state = context.get("order_state", OrderState.PENDING)
requested_state = context.get("requested_state")
valid_transitions = self.TRANSITIONS.get(current_state, [])
if requested_state in valid_transitions:
context.set("order_state", requested_state)
return f"handle_{requested_state}"
else:
context.set("transition_error", f"Cannot transition from {current_state} to {requested_state}")
return "handle_invalid_transition"
Retry Logic with Gateways¶
Implement retry behavior:
class RetryGateway(BaseGateway):
"""Determine whether to retry a failed operation."""
name = "retry_gateway"
max_retries = 3
async def evaluate(self, context: WorkflowContext) -> str:
retry_count = context.get("retry_count", 0)
last_error = context.get("last_error")
if not last_error:
# No error - proceed normally
return "next_step"
if retry_count >= self.max_retries:
# Max retries reached - fail
return "handle_failure"
# Check if error is retryable
if self.is_retryable(last_error):
context.set("retry_count", retry_count + 1)
return "retry_operation"
else:
return "handle_failure"
def is_retryable(self, error: str) -> bool:
retryable_patterns = ["timeout", "connection", "rate limit"]
return any(p in error.lower() for p in retryable_patterns)
Complete Example: Multi-Level Approval¶
"""Multi-level approval workflow with conditional routing."""
from litestar_workflows import (
WorkflowDefinition,
Edge,
BaseMachineStep,
BaseHumanStep,
BaseGateway,
LocalExecutionEngine,
WorkflowRegistry,
WorkflowContext,
)
class SubmitRequest(BaseMachineStep):
name = "submit"
async def execute(self, context: WorkflowContext) -> None:
context.set("submitted", True)
context.set("approval_level", 0)
class ApprovalRouter(BaseGateway):
"""Route to appropriate approver based on amount and current level."""
name = "approval_router"
# Approval levels: (max_amount, step_name, level)
LEVELS = [
(1000, "auto_approve", 0),
(5000, "team_lead_approval", 1),
(25000, "manager_approval", 2),
(100000, "director_approval", 3),
(float("inf"), "vp_approval", 4),
]
async def evaluate(self, context: WorkflowContext) -> str:
amount = context.get("amount", 0)
current_level = context.get("approval_level", 0)
# Find required approval level
for max_amount, step_name, level in self.LEVELS:
if amount <= max_amount:
required_level = level
required_step = step_name
break
# Check if we've reached required level
if current_level >= required_level:
return "process_approved"
# Route to next approver in chain
for max_amount, step_name, level in self.LEVELS:
if level == current_level + 1:
return step_name
return "process_approved"
class TeamLeadApproval(BaseHumanStep):
name = "team_lead_approval"
title = "Team Lead Approval"
form_schema = {
"type": "object",
"properties": {
"approved": {"type": "boolean", "title": "Approve?"},
"escalate": {"type": "boolean", "title": "Escalate to Manager?"},
},
"required": ["approved"],
}
class ManagerApproval(BaseHumanStep):
name = "manager_approval"
title = "Manager Approval"
form_schema = {
"type": "object",
"properties": {
"approved": {"type": "boolean", "title": "Approve?"},
},
"required": ["approved"],
}
class UpdateApprovalLevel(BaseMachineStep):
name = "update_level"
async def execute(self, context: WorkflowContext) -> None:
current = context.get("approval_level", 0)
context.set("approval_level", current + 1)
class AutoApprove(BaseMachineStep):
name = "auto_approve"
async def execute(self, context: WorkflowContext) -> None:
context.set("approved", True)
context.set("approved_by", "system")
class ProcessApproved(BaseMachineStep):
name = "process_approved"
async def execute(self, context: WorkflowContext) -> None:
context.set("status", "approved")
context.set("processed", True)
class ProcessRejected(BaseMachineStep):
name = "process_rejected"
async def execute(self, context: WorkflowContext) -> None:
context.set("status", "rejected")
# Workflow definition
approval_workflow = WorkflowDefinition(
name="multi_level_approval",
version="1.0.0",
description="Multi-level approval with conditional routing",
steps={
"submit": SubmitRequest(),
"router": ApprovalRouter(),
"auto_approve": AutoApprove(),
"team_lead_approval": TeamLeadApproval(),
"manager_approval": ManagerApproval(),
"update_level": UpdateApprovalLevel(),
"process_approved": ProcessApproved(),
"process_rejected": ProcessRejected(),
},
edges=[
Edge("submit", "router"),
# Gateway routes to appropriate step
Edge("router", "auto_approve"),
Edge("router", "team_lead_approval"),
Edge("router", "manager_approval"),
Edge("router", "process_approved"),
# Auto approve goes directly to processing
Edge("auto_approve", "process_approved"),
# Human approvals check result
Edge(
"team_lead_approval",
"update_level",
condition="context.get('approved') == True"
),
Edge(
"team_lead_approval",
"process_rejected",
condition="context.get('approved') == False"
),
Edge(
"manager_approval",
"update_level",
condition="context.get('approved') == True"
),
Edge(
"manager_approval",
"process_rejected",
condition="context.get('approved') == False"
),
# After level update, re-evaluate routing
Edge("update_level", "router"),
],
initial_step="submit",
terminal_steps={"process_approved", "process_rejected"},
)
Best Practices¶
Keep Gateway Logic Simple¶
# Good - clear, simple logic
async def evaluate(self, context):
if context.get("approved"):
return "next_step"
return "rejection_handler"
# Avoid - too complex
async def evaluate(self, context):
if context.get("a") and not context.get("b") or (context.get("c") > 10 and context.get("d") != "x"):
# Hard to understand and debug
...
Document Decision Criteria¶
class ApprovalGateway(BaseGateway):
"""Route based on amount thresholds.
Routing Rules:
- < $1,000: Auto-approve
- $1,000 - $5,000: Team lead approval
- $5,000 - $25,000: Manager approval
- > $25,000: Director approval
"""
...
Test All Branches¶
import pytest
@pytest.mark.parametrize("amount,expected", [
(500, "auto_approve"),
(2000, "team_lead"),
(10000, "manager"),
(50000, "director"),
])
async def test_gateway_routing(amount, expected):
context = WorkflowContext(data={"amount": amount})
gateway = AmountGateway()
result = await gateway.evaluate(context)
assert result == expected
Handle Edge Cases¶
class SafeGateway(BaseGateway):
async def evaluate(self, context):
amount = context.get("amount")
# Handle missing data
if amount is None:
context.set("routing_error", "Amount is required")
return "handle_error"
# Handle invalid data
if not isinstance(amount, (int, float)) or amount < 0:
context.set("routing_error", f"Invalid amount: {amount}")
return "handle_error"
# Normal routing
if amount < 1000:
return "auto_approve"
return "manual_review"
Next Steps¶
Learn about parallel execution: See Parallel Execution
Understand human tasks: See Working with Human Tasks