Document Review Workflow¶
A complete document review system with submit-review-revise cycles, parallel reviewers, approval/rejection handling, and persistence layer integration.
This recipe demonstrates:
Submit -> Review -> Revise iteration cycle
Multiple reviewers in parallel
Consolidating parallel review results
Handling approval, rejection, and revision requests
Full persistence with SQLAlchemy
Overview¶
The workflow supports iterative document review with multiple reviewers:
[Submit] --> [Parallel Review] --> [Consolidate] --> [Decision Gateway]
| | | |
[Reviewer 1][Reviewer 2] | +-- (approved) --> [Publish]
| |
| +-- (changes) --> [Author Revise] --+
| | |
| +-- (rejected) --> [Notify Reject] |
| |
+<----------------------------------------+
Review Outcomes:
Approved: All reviewers approve, document proceeds to publication
Changes Requested: Any reviewer requests changes, author revises
Rejected: Any reviewer rejects, document is declined
Complete Implementation¶
"""Document review workflow with parallel reviewers and revision cycles.
Save this file as ``document_review.py`` and run with:
uv run python document_review.py
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any
from litestar import Litestar, get
from litestar.di import Provide
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from litestar_workflows import (
BaseMachineStep,
BaseHumanStep,
Edge,
ExclusiveGateway,
ParallelGroup,
WorkflowContext,
WorkflowDefinition,
WorkflowPlugin,
WorkflowPluginConfig,
WorkflowRegistry,
)
from litestar_workflows.db import PersistentExecutionEngine
if TYPE_CHECKING:
pass
# =============================================================================
# Step Definitions
# =============================================================================
class SubmitDocument(BaseMachineStep):
"""Initialize document submission for review."""
name = "submit_document"
description = "Record document submission and initialize review"
async def execute(self, context: WorkflowContext) -> dict[str, Any]:
"""Prepare document for review process."""
context.set("submitted_at", datetime.now(timezone.utc).isoformat())
context.set("status", "under_review")
context.set("revision_count", context.get("revision_count", 0))
context.set("review_history", context.get("review_history", []))
# Validate required fields
title = context.get("document_title")
if not title:
raise ValueError("Document title is required")
document_url = context.get("document_url")
if not document_url:
raise ValueError("Document URL is required")
return {
"submitted": True,
"title": title,
"revision": context.get("revision_count", 0),
}
class TechnicalReview(BaseHumanStep):
"""Technical accuracy review."""
name = "technical_review"
title = "Technical Review Required"
description = "Review document for technical accuracy"
form_schema = {
"type": "object",
"title": "Technical Review",
"properties": {
"decision": {
"type": "string",
"title": "Review Decision",
"enum": ["approve", "request_changes", "reject"],
"enumNames": ["Approve", "Request Changes", "Reject"],
},
"technical_accuracy": {
"type": "integer",
"title": "Technical Accuracy (1-5)",
"minimum": 1,
"maximum": 5,
},
"completeness": {
"type": "integer",
"title": "Completeness (1-5)",
"minimum": 1,
"maximum": 5,
},
"feedback": {
"type": "string",
"title": "Feedback",
"format": "textarea",
"description": "Detailed feedback for the author",
},
"specific_issues": {
"type": "array",
"title": "Specific Issues",
"items": {
"type": "object",
"properties": {
"section": {"type": "string", "title": "Section"},
"issue": {"type": "string", "title": "Issue"},
"suggestion": {"type": "string", "title": "Suggestion"},
},
},
},
},
"required": ["decision", "technical_accuracy", "completeness"],
}
async def get_description(self, context: WorkflowContext) -> str:
"""Provide document context for reviewer."""
title = context.get("document_title", "Untitled")
url = context.get("document_url", "")
revision = context.get("revision_count", 0)
author = context.get("author", "Unknown")
return f"""
**Document Review Request**
- **Title:** {title}
- **Author:** {author}
- **Revision:** {revision}
- **Document:** [{title}]({url})
Please review for technical accuracy and completeness.
"""
async def get_assignee_group(self, context: WorkflowContext) -> str | None:
"""Route to technical reviewers group."""
return "technical-reviewers"
async def get_due_date(self, context: WorkflowContext) -> datetime:
"""Technical review due in 3 business days."""
return datetime.now(timezone.utc) + timedelta(days=3)
class EditorialReview(BaseHumanStep):
"""Editorial and style review."""
name = "editorial_review"
title = "Editorial Review Required"
description = "Review document for style and clarity"
form_schema = {
"type": "object",
"title": "Editorial Review",
"properties": {
"decision": {
"type": "string",
"title": "Review Decision",
"enum": ["approve", "request_changes", "reject"],
"enumNames": ["Approve", "Request Changes", "Reject"],
},
"clarity": {
"type": "integer",
"title": "Clarity (1-5)",
"minimum": 1,
"maximum": 5,
},
"style_compliance": {
"type": "integer",
"title": "Style Guide Compliance (1-5)",
"minimum": 1,
"maximum": 5,
},
"grammar_issues": {
"type": "boolean",
"title": "Has Grammar Issues",
"default": False,
},
"feedback": {
"type": "string",
"title": "Editorial Feedback",
"format": "textarea",
},
},
"required": ["decision", "clarity", "style_compliance"],
}
async def get_assignee_group(self, context: WorkflowContext) -> str | None:
"""Route to editorial team."""
return "editorial-team"
async def get_due_date(self, context: WorkflowContext) -> datetime:
"""Editorial review due in 2 business days."""
return datetime.now(timezone.utc) + timedelta(days=2)
class ConsolidateReviews(BaseMachineStep):
"""Consolidate results from parallel reviews."""
name = "consolidate_reviews"
description = "Aggregate review decisions and feedback"
async def execute(self, context: WorkflowContext) -> dict[str, Any]:
"""Consolidate all review feedback into a single decision."""
# Collect decisions from all reviewers
tech_decision = context.get("technical_review_decision", "approve")
edit_decision = context.get("editorial_review_decision", "approve")
decisions = [tech_decision, edit_decision]
# Determine consolidated outcome
if "reject" in decisions:
consolidated_decision = "rejected"
elif "request_changes" in decisions:
consolidated_decision = "changes_requested"
else:
consolidated_decision = "approved"
context.set("consolidated_decision", consolidated_decision)
# Compile all feedback
all_feedback = []
tech_feedback = context.get("technical_review_feedback")
if tech_feedback:
all_feedback.append({
"reviewer": "Technical",
"decision": tech_decision,
"feedback": tech_feedback,
"issues": context.get("technical_review_specific_issues", []),
})
edit_feedback = context.get("editorial_review_feedback")
if edit_feedback:
all_feedback.append({
"reviewer": "Editorial",
"decision": edit_decision,
"feedback": edit_feedback,
})
context.set("consolidated_feedback", all_feedback)
# Record in review history
review_history = context.get("review_history", [])
review_history.append({
"revision": context.get("revision_count", 0),
"date": datetime.now(timezone.utc).isoformat(),
"decision": consolidated_decision,
"feedback": all_feedback,
})
context.set("review_history", review_history)
return {
"consolidated": True,
"decision": consolidated_decision,
"feedback_count": len(all_feedback),
}
class ReviewDecisionGateway(ExclusiveGateway):
"""Route based on consolidated review decision."""
name = "review_decision"
description = "Route based on review outcome"
async def evaluate(self, context: WorkflowContext) -> str:
"""Determine next step based on review decision."""
decision = context.get("consolidated_decision", "changes_requested")
if decision == "approved":
return "publish_document"
elif decision == "rejected":
return "notify_rejection"
else:
# Check revision limit
revision_count = context.get("revision_count", 0)
max_revisions = context.get("max_revisions", 3)
if revision_count >= max_revisions:
return "notify_rejection"
return "author_revision"
class AuthorRevision(BaseHumanStep):
"""Author addresses reviewer feedback."""
name = "author_revision"
title = "Revisions Requested"
description = "Address reviewer feedback and resubmit"
form_schema = {
"type": "object",
"title": "Document Revision",
"properties": {
"revision_notes": {
"type": "string",
"title": "Revision Notes",
"format": "textarea",
"description": "Describe the changes you made to address feedback",
"minLength": 50,
},
"updated_url": {
"type": "string",
"title": "Updated Document URL",
"format": "uri",
"description": "URL to the revised document",
},
"addressed_issues": {
"type": "array",
"title": "Issues Addressed",
"items": {
"type": "object",
"properties": {
"issue": {"type": "string"},
"resolution": {"type": "string"},
},
},
},
"withdraw": {
"type": "boolean",
"title": "Withdraw Submission",
"description": "Check to withdraw this document from review",
"default": False,
},
},
"required": ["revision_notes"],
}
async def get_description(self, context: WorkflowContext) -> str:
"""Show feedback to author."""
title = context.get("document_title", "Untitled")
revision = context.get("revision_count", 0)
feedback = context.get("consolidated_feedback", [])
feedback_text = ""
for item in feedback:
feedback_text += f"\n\n**{item['reviewer']} Review ({item['decision']})**\n{item['feedback']}"
return f"""
**Revision Requested for: {title}**
Revision #{revision + 1}
Please address the following feedback:
{feedback_text}
Submit your revised document or withdraw if you wish to cancel.
"""
async def get_assignee(self, context: WorkflowContext) -> str | None:
"""Route back to the original author."""
return context.get("author")
class PrepareResubmission(BaseMachineStep):
"""Prepare document for re-review after revision."""
name = "prepare_resubmission"
description = "Update document state for re-review"
async def execute(self, context: WorkflowContext) -> dict[str, Any]:
"""Prepare for next review round."""
# Check if author withdrew
if context.get("withdraw"):
context.set("status", "withdrawn")
return {"withdrawn": True}
# Update revision count
revision_count = context.get("revision_count", 0) + 1
context.set("revision_count", revision_count)
# Update document URL if provided
updated_url = context.get("updated_url")
if updated_url:
context.set("document_url", updated_url)
context.set("status", "resubmitted")
context.set("resubmitted_at", datetime.now(timezone.utc).isoformat())
return {
"resubmitted": True,
"revision": revision_count,
}
class PublishDocument(BaseMachineStep):
"""Publish approved document."""
name = "publish_document"
description = "Publish the approved document"
async def execute(self, context: WorkflowContext) -> dict[str, Any]:
"""Finalize and publish document."""
context.set("status", "published")
context.set("published_at", datetime.now(timezone.utc).isoformat())
# Calculate average scores
tech_accuracy = context.get("technical_review_technical_accuracy", 0)
tech_complete = context.get("technical_review_completeness", 0)
edit_clarity = context.get("editorial_review_clarity", 0)
edit_style = context.get("editorial_review_style_compliance", 0)
scores = [s for s in [tech_accuracy, tech_complete, edit_clarity, edit_style] if s > 0]
avg_score = sum(scores) / len(scores) if scores else 0
context.set("quality_score", avg_score)
# In a real app: publish to CMS, notify stakeholders
return {
"published": True,
"title": context.get("document_title"),
"quality_score": avg_score,
"revisions": context.get("revision_count", 0),
}
class NotifyRejection(BaseMachineStep):
"""Notify author of document rejection."""
name = "notify_rejection"
description = "Handle document rejection"
async def execute(self, context: WorkflowContext) -> dict[str, Any]:
"""Process rejection and notify author."""
context.set("status", "rejected")
context.set("rejected_at", datetime.now(timezone.utc).isoformat())
# In a real app: send notification email
return {
"rejected": True,
"reason": "Document did not meet quality standards after review",
"feedback": context.get("consolidated_feedback"),
}
# =============================================================================
# Workflow Definition
# =============================================================================
# Create parallel review group
parallel_review = ParallelGroup(
TechnicalReview(),
EditorialReview(),
)
document_review_workflow = WorkflowDefinition(
name="document_review",
version="1.0.0",
description="Document review with parallel reviewers and revision cycles",
steps={
"submit_document": SubmitDocument(),
"parallel_review": parallel_review,
"consolidate_reviews": ConsolidateReviews(),
"review_decision": ReviewDecisionGateway(),
"author_revision": AuthorRevision(),
"prepare_resubmission": PrepareResubmission(),
"publish_document": PublishDocument(),
"notify_rejection": NotifyRejection(),
},
edges=[
# Initial flow
Edge("submit_document", "parallel_review"),
Edge("parallel_review", "consolidate_reviews"),
Edge("consolidate_reviews", "review_decision"),
# Decision gateway routes
Edge(
"review_decision",
"publish_document",
condition="context.get('consolidated_decision') == 'approved'"
),
Edge(
"review_decision",
"author_revision",
condition="context.get('consolidated_decision') == 'changes_requested'"
),
Edge(
"review_decision",
"notify_rejection",
condition="context.get('consolidated_decision') == 'rejected'"
),
# Revision cycle
Edge("author_revision", "prepare_resubmission"),
Edge(
"prepare_resubmission",
"parallel_review",
condition="context.get('withdraw') != True"
),
Edge(
"prepare_resubmission",
"notify_rejection",
condition="context.get('withdraw') == True"
),
],
initial_step="submit_document",
terminal_steps={"publish_document", "notify_rejection"},
)
# =============================================================================
# Application Setup
# =============================================================================
def create_app() -> Litestar:
"""Create the Litestar application with document review workflow."""
db_engine = create_async_engine(
"sqlite+aiosqlite:///documents.db",
echo=False,
)
session_factory = async_sessionmaker(db_engine, expire_on_commit=False)
registry = WorkflowRegistry()
registry.register_definition(document_review_workflow)
async def provide_session() -> AsyncSession:
async with session_factory() as session:
yield session
async def provide_engine(session: AsyncSession) -> PersistentExecutionEngine:
return PersistentExecutionEngine(registry=registry, session=session)
async def provide_registry() -> WorkflowRegistry:
return registry
@get("/health")
async def health() -> dict[str, str]:
return {"status": "healthy"}
return Litestar(
route_handlers=[health],
plugins=[
WorkflowPlugin(
config=WorkflowPluginConfig(
enable_api=True,
api_path_prefix="/api/workflows",
)
),
],
dependencies={
"session": Provide(provide_session),
"workflow_engine": Provide(provide_engine),
"workflow_registry": Provide(provide_registry),
},
)
app = create_app()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Key Concepts¶
Parallel Review Pattern¶
Multiple reviewers work simultaneously using ParallelGroup:
from litestar_workflows import ParallelGroup
parallel_review = ParallelGroup(
TechnicalReview(),
EditorialReview(),
)
# In workflow definition
steps={"parallel_review": parallel_review}
All reviewers receive tasks at the same time. The workflow waits until all complete before proceeding.
Consolidating Parallel Results¶
After parallel steps complete, aggregate their outputs:
class ConsolidateReviews(BaseMachineStep):
async def execute(self, context: WorkflowContext) -> dict:
# Each parallel step stores results with a prefix
tech_decision = context.get("technical_review_decision")
edit_decision = context.get("editorial_review_decision")
# Determine overall outcome
if "reject" in [tech_decision, edit_decision]:
context.set("consolidated_decision", "rejected")
elif "request_changes" in [tech_decision, edit_decision]:
context.set("consolidated_decision", "changes_requested")
else:
context.set("consolidated_decision", "approved")
Revision Cycle¶
The workflow loops back for revisions using conditional edges:
edges = [
# Author revision returns to review
Edge("author_revision", "prepare_resubmission"),
Edge(
"prepare_resubmission",
"parallel_review", # Loop back
condition="context.get('withdraw') != True"
),
]
Track revision count to prevent infinite loops:
revision_count = context.get("revision_count", 0)
max_revisions = context.get("max_revisions", 3)
if revision_count >= max_revisions:
return "notify_rejection"
Customization Points¶
- Add More Reviewers
Add steps to the
ParallelGroup- Change Review Criteria
Modify the
form_schemain review steps- Adjust Revision Limits
Set
max_revisionsin the initial context- Add Legal Review
Create a
LegalReviewstep and add to parallel group
Usage Example¶
Start a document review via the API:
curl -X POST http://localhost:8000/api/workflows/instances \
-H "Content-Type: application/json" \
-d '{
"definition_name": "document_review",
"input_data": {
"document_title": "API Design Guidelines",
"document_url": "https://docs.example.com/draft/api-guidelines",
"author": "alice@example.com",
"max_revisions": 3
}
}'
Complete a technical review:
curl -X POST http://localhost:8000/api/workflows/tasks/{task_id}/complete \
-H "Content-Type: application/json" \
-d '{
"output_data": {
"decision": "request_changes",
"technical_accuracy": 4,
"completeness": 3,
"feedback": "Good overall, but missing error handling section"
},
"completed_by": "reviewer@example.com"
}'
Testing¶
import pytest
from litestar_workflows import WorkflowContext
from document_review import ConsolidateReviews, ReviewDecisionGateway
@pytest.mark.asyncio
async def test_consolidate_all_approved():
"""All approvals should consolidate to approved."""
context = WorkflowContext()
context.set("technical_review_decision", "approve")
context.set("editorial_review_decision", "approve")
step = ConsolidateReviews()
await step.execute(context)
assert context.get("consolidated_decision") == "approved"
@pytest.mark.asyncio
async def test_consolidate_any_rejection():
"""Any rejection should consolidate to rejected."""
context = WorkflowContext()
context.set("technical_review_decision", "approve")
context.set("editorial_review_decision", "reject")
step = ConsolidateReviews()
await step.execute(context)
assert context.get("consolidated_decision") == "rejected"
@pytest.mark.asyncio
async def test_revision_limit():
"""Exceeding revision limit should trigger rejection."""
context = WorkflowContext()
context.set("consolidated_decision", "changes_requested")
context.set("revision_count", 3)
context.set("max_revisions", 3)
gateway = ReviewDecisionGateway()
result = await gateway.evaluate(context)
assert result == "notify_rejection"
See Also¶
Parallel Execution - Parallel step patterns
Working with Human Tasks - Human task configuration
Database Persistence - Database persistence
Integration Patterns - External API integration