Building a Simple Workflow¶
This guide walks you through building a complete, production-ready workflow from scratch. You’ll create a document processing workflow that validates, transforms, and stores documents.
Goal¶
Build a workflow that:
Validates uploaded document metadata
Processes the document content
Stores the result
Sends a notification
[Validate] --> [Process] --> [Store] --> [Notify]
Prerequisites¶
litestar-workflows installed
Basic understanding of async Python
Step 1: Define Your Steps¶
Create each step as a class that inherits from BaseMachineStep:
# steps.py
from litestar_workflows import BaseMachineStep, WorkflowContext
class ValidateDocument(BaseMachineStep):
"""Validate document metadata and content."""
name = "validate"
description = "Validate document meets requirements"
async def execute(self, context: WorkflowContext) -> None:
# Get document data from context
document = context.get("document", {})
# Validate required fields
errors = []
if not document.get("title"):
errors.append("Title is required")
if not document.get("content"):
errors.append("Content is required")
if len(document.get("content", "")) < 10:
errors.append("Content must be at least 10 characters")
# Store validation results
context.set("validation_errors", errors)
context.set("is_valid", len(errors) == 0)
# Fail if validation errors
if errors:
raise ValueError(f"Validation failed: {', '.join(errors)}")
async def can_execute(self, context: WorkflowContext) -> bool:
"""Only run if document exists."""
return context.get("document") is not None
class ProcessDocument(BaseMachineStep):
"""Transform document content."""
name = "process"
description = "Process and transform document"
async def execute(self, context: WorkflowContext) -> None:
document = context.get("document", {})
# Simulate processing
processed_content = document["content"].upper()
word_count = len(document["content"].split())
# Store results
context.set("processed_content", processed_content)
context.set("word_count", word_count)
context.set("processed_at", datetime.now().isoformat())
class StoreDocument(BaseMachineStep):
"""Persist document to storage."""
name = "store"
description = "Save document to database"
async def execute(self, context: WorkflowContext) -> None:
document = context.get("document", {})
processed_content = context.get("processed_content")
# Simulate storage (replace with actual database call)
document_id = f"doc-{uuid4().hex[:8]}"
context.set("document_id", document_id)
context.set("stored", True)
class NotifyComplete(BaseMachineStep):
"""Send completion notification."""
name = "notify"
description = "Send notification on completion"
async def execute(self, context: WorkflowContext) -> None:
document_id = context.get("document_id")
word_count = context.get("word_count")
# Simulate notification (replace with actual service)
notification = {
"type": "document_processed",
"document_id": document_id,
"word_count": word_count,
"message": f"Document {document_id} processed successfully",
}
context.set("notification", notification)
context.set("notified", True)
Step 2: Create the Workflow Definition¶
Wire the steps together with edges:
# workflow.py
from litestar_workflows import WorkflowDefinition, Edge
from .steps import ValidateDocument, ProcessDocument, StoreDocument, NotifyComplete
document_workflow = WorkflowDefinition(
name="document_processing",
version="1.0.0",
description="Validate, process, store, and notify for documents",
steps={
"validate": ValidateDocument(),
"process": ProcessDocument(),
"store": StoreDocument(),
"notify": NotifyComplete(),
},
edges=[
Edge(source="validate", target="process"),
Edge(source="process", target="store"),
Edge(source="store", target="notify"),
],
initial_step="validate",
terminal_steps={"notify"},
)
Step 3: Set Up the Engine¶
Create a registry and engine:
# engine_setup.py
from litestar_workflows import WorkflowRegistry, LocalExecutionEngine
from .workflow import document_workflow
# Create and configure registry
registry = WorkflowRegistry()
registry.register_definition(document_workflow)
# Create engine
engine = LocalExecutionEngine(registry)
Step 4: Run the Workflow¶
Start workflow instances:
# main.py
import asyncio
from .engine_setup import engine
async def process_document(title: str, content: str) -> dict:
"""Start a document processing workflow."""
instance = await engine.start_workflow(
"document_processing",
initial_data={
"document": {
"title": title,
"content": content,
}
}
)
# Since all steps are machine steps, execution completes immediately
return {
"instance_id": str(instance.id),
"document_id": instance.context.get("document_id"),
"word_count": instance.context.get("word_count"),
"status": instance.status.value,
}
async def main():
result = await process_document(
title="My Document",
content="This is the content of my document that needs processing."
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
Step 5: Add Error Handling¶
Implement error handling in your steps:
class ProcessDocument(BaseMachineStep):
name = "process"
async def execute(self, context: WorkflowContext) -> None:
try:
document = context.get("document", {})
processed_content = document["content"].upper()
context.set("processed_content", processed_content)
except Exception as e:
context.set("processing_error", str(e))
raise
async def on_failure(self, context: WorkflowContext, error: Exception) -> None:
"""Handle processing failures."""
# Log the error
print(f"Processing failed: {error}")
# Set failure state
context.set("failed_at", "process")
context.set("failure_reason", str(error))
Step 6: Integrate with Litestar¶
Create an API endpoint to trigger workflows:
# app.py
from litestar import Litestar, post
from litestar.di import Provide
from pydantic import BaseModel
from .engine_setup import engine
class DocumentRequest(BaseModel):
title: str
content: str
class WorkflowResponse(BaseModel):
instance_id: str
document_id: str | None
status: str
@post("/documents/process")
async def process_document(data: DocumentRequest) -> WorkflowResponse:
"""Process a document through the workflow."""
instance = await engine.start_workflow(
"document_processing",
initial_data={
"document": {
"title": data.title,
"content": data.content,
}
}
)
return WorkflowResponse(
instance_id=str(instance.id),
document_id=instance.context.get("document_id"),
status=instance.status.value,
)
app = Litestar(route_handlers=[process_document])
Complete Example¶
Here’s the complete working example:
"""Complete document processing workflow."""
import asyncio
from datetime import datetime
from uuid import uuid4
from litestar_workflows import (
WorkflowDefinition,
Edge,
BaseMachineStep,
LocalExecutionEngine,
WorkflowRegistry,
WorkflowContext,
)
# Steps
class ValidateDocument(BaseMachineStep):
name = "validate"
description = "Validate document"
async def execute(self, context: WorkflowContext) -> None:
document = context.get("document", {})
if not document.get("title") or not document.get("content"):
raise ValueError("Title and content required")
context.set("is_valid", True)
class ProcessDocument(BaseMachineStep):
name = "process"
description = "Process document"
async def execute(self, context: WorkflowContext) -> None:
document = context.get("document", {})
context.set("processed_content", document["content"].upper())
context.set("word_count", len(document["content"].split()))
class StoreDocument(BaseMachineStep):
name = "store"
description = "Store document"
async def execute(self, context: WorkflowContext) -> None:
context.set("document_id", f"doc-{uuid4().hex[:8]}")
context.set("stored", True)
class NotifyComplete(BaseMachineStep):
name = "notify"
description = "Send notification"
async def execute(self, context: WorkflowContext) -> None:
context.set("notified", True)
# Workflow definition
document_workflow = WorkflowDefinition(
name="document_processing",
version="1.0.0",
description="Document processing pipeline",
steps={
"validate": ValidateDocument(),
"process": ProcessDocument(),
"store": StoreDocument(),
"notify": NotifyComplete(),
},
edges=[
Edge("validate", "process"),
Edge("process", "store"),
Edge("store", "notify"),
],
initial_step="validate",
terminal_steps={"notify"},
)
# Setup
registry = WorkflowRegistry()
registry.register_definition(document_workflow)
engine = LocalExecutionEngine(registry)
async def main():
instance = await engine.start_workflow(
"document_processing",
initial_data={
"document": {
"title": "Test Document",
"content": "Hello world this is a test document.",
}
}
)
print(f"Status: {instance.status}")
print(f"Document ID: {instance.context.get('document_id')}")
print(f"Word count: {instance.context.get('word_count')}")
if __name__ == "__main__":
asyncio.run(main())
Next Steps¶
Add human approval: See Working with Human Tasks
Run steps in parallel: See Parallel Execution
Add conditional logic: See Conditional Logic and Gateways