Phase 3: Web Plugin Architecture¶
Executive Summary¶
This document defines the architecture for the litestar-workflows[web] extra,
which provides REST API endpoints, DTOs, and Litestar integration for managing
workflow definitions, instances, and human tasks. The REST API is built into the
main WorkflowPlugin and enabled by default via enable_api=True. It builds
on the Phase 2 persistence layer and follows Litestar best practices for controllers,
dependency injection, and OpenAPI schema generation.
Design Goals¶
Zero-Configuration Default: Works out of the box with sensible defaults
Full Customization: Every aspect configurable (routes, guards, DTOs)
OpenAPI-First: Auto-generated, comprehensive OpenAPI documentation
Litestar-Native: Deep integration with DI, guards, middleware, and plugins
Type-Safe: Full typing with DTO validation at boundaries
Testable: Easy to test with dependency injection and mocking
Module Structure¶
The web plugin module structure follows a layered architecture:
src/litestar_workflows/web/
|-- __init__.py # Public API exports
|-- plugin.py # WorkflowWebPlugin implementation
|-- config.py # WorkflowWebPluginConfig dataclass
|-- dependencies.py # DI providers for engine/repositories
|
|-- controllers/ # REST API controllers
| |-- __init__.py
| |-- definitions.py # WorkflowDefinitionController
| |-- instances.py # WorkflowInstanceController
| |-- tasks.py # HumanTaskController
| |-- admin.py # WorkflowAdminController
| |-- graphs.py # GraphController (MermaidJS endpoints)
|
|-- dto/ # Data Transfer Objects
| |-- __init__.py
| |-- base.py # Base DTO classes and config
| |-- definitions.py # WorkflowDefinition DTOs
| |-- instances.py # WorkflowInstance DTOs
| |-- tasks.py # HumanTask DTOs
| |-- graphs.py # Graph visualization DTOs
|
|-- guards/ # Authentication/authorization guards
| |-- __init__.py
| |-- base.py # BaseWorkflowGuard protocol
| |-- auth.py # WorkflowAuthGuard (requires user)
| |-- admin.py # WorkflowAdminGuard (requires admin role)
| |-- task.py # TaskAssigneeGuard (task ownership)
|
|-- services/ # Business logic services
| |-- __init__.py
| |-- workflow.py # WorkflowService (orchestration)
| |-- graph.py # GraphService (visualization)
|
|-- exceptions.py # Web-specific exception handlers
|-- openapi.py # OpenAPI schema customization
Dependency Graph¶
web/plugin.py
|
+-- web/config.py
|
+-- web/dependencies.py
| |
| +-- db/repositories.py
| +-- engine/registry.py
| +-- db/engine.py (PersistentExecutionEngine)
|
+-- web/controllers/*.py
| |
| +-- web/dto/*.py
| +-- web/services/*.py
| +-- web/guards/*.py
|
+-- web/exceptions.py
+-- web/openapi.py
Plugin Design¶
WorkflowWebPluginConfig¶
Configuration dataclass for the web plugin:
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from litestar.types import Guard
@dataclass
class WorkflowWebPluginConfig:
"""Configuration for the WorkflowWebPlugin.
Attributes:
path_prefix: URL prefix for all workflow routes. Defaults to "/workflows".
enable_api: Enable REST API endpoints. Defaults to True.
enable_admin_api: Enable admin API endpoints. Defaults to True.
enable_graph_api: Enable graph visualization endpoints. Defaults to True.
include_in_schema: Include routes in OpenAPI schema. Defaults to True.
tags: OpenAPI tags for workflow routes.
api_guards: Guards applied to API routes.
admin_guards: Guards applied to admin routes.
task_guards: Guards applied to task completion routes.
dto_config: Custom DTO configuration overrides.
dependency_key_engine: DI key for ExecutionEngine. Defaults to "workflow_engine".
dependency_key_registry: DI key for WorkflowRegistry. Defaults to "workflow_registry".
dependency_key_instance_repo: DI key for instance repository.
dependency_key_task_repo: DI key for task repository.
dependency_key_definition_repo: DI key for definition repository.
auto_create_dependencies: Auto-create DI providers. Defaults to True.
session_dependency_key: Key for SQLAlchemy session dependency.
"""
# Route configuration
path_prefix: str = "/workflows"
enable_api: bool = True
enable_admin_api: bool = True
enable_graph_api: bool = True
include_in_schema: bool = True
tags: list[str] = field(default_factory=lambda: ["Workflows"])
# Security configuration
api_guards: list[Guard] = field(default_factory=list)
admin_guards: list[Guard] = field(default_factory=list)
task_guards: list[Guard] = field(default_factory=list)
# DTO configuration
dto_config: dict[str, Any] = field(default_factory=dict)
# Dependency injection keys
dependency_key_engine: str = "workflow_engine"
dependency_key_registry: str = "workflow_registry"
dependency_key_instance_repo: str = "workflow_instance_repo"
dependency_key_task_repo: str = "workflow_task_repo"
dependency_key_definition_repo: str = "workflow_definition_repo"
# Auto-configuration
auto_create_dependencies: bool = True
session_dependency_key: str = "db_session"
WorkflowWebPlugin¶
The main plugin class implementing InitPluginProtocol:
from litestar.plugins import InitPluginProtocol
from litestar.config.app import AppConfig
from litestar.router import Router
class WorkflowWebPlugin(InitPluginProtocol):
"""Litestar plugin for workflow web routes and API.
This plugin provides:
- REST API endpoints for workflow management
- Human task inbox and completion API
- Graph visualization endpoints (MermaidJS)
- Admin API for workflow administration
- OpenAPI schema with full documentation
Example:
Basic usage::
from litestar import Litestar
from litestar_workflows.web import WorkflowWebPlugin, WorkflowWebPluginConfig
app = Litestar(
plugins=[
WorkflowWebPlugin(
config=WorkflowWebPluginConfig(
path_prefix="/api/workflows",
admin_guards=[AdminGuard],
)
)
]
)
With custom guards::
from litestar_workflows.web import WorkflowWebPlugin, WorkflowWebPluginConfig
config = WorkflowWebPluginConfig(
api_guards=[AuthGuard],
admin_guards=[AdminGuard],
task_guards=[AuthGuard, TaskOwnerGuard],
)
plugin = WorkflowWebPlugin(config=config)
"""
__slots__ = ("_config",)
def __init__(self, config: WorkflowWebPluginConfig | None = None) -> None:
self._config = config or WorkflowWebPluginConfig()
@property
def config(self) -> WorkflowWebPluginConfig:
"""Get the plugin configuration."""
return self._config
def on_app_init(self, app_config: AppConfig) -> AppConfig:
"""Initialize the plugin when the Litestar app starts."""
# Register dependencies
if self._config.auto_create_dependencies:
self._register_dependencies(app_config)
# Build and register routes
routers = self._build_routers()
app_config.route_handlers.extend(routers)
# Register exception handlers
self._register_exception_handlers(app_config)
# Add OpenAPI tags
self._configure_openapi(app_config)
return app_config
Plugin Initialization Flow¶
on_app_init()
|
+-- _register_dependencies()
| |-- Register engine provider
| |-- Register registry provider
| +-- Register repository providers
|
+-- _build_routers()
| |-- _create_api_router() (if enable_api)
| |-- _create_admin_router() (if enable_admin_api)
| +-- _create_graph_router() (if enable_graph_api)
|
+-- _register_exception_handlers()
| |-- WorkflowNotFoundError -> 404
| |-- InvalidTransitionError -> 409
| +-- TaskNotAssignedError -> 403
|
+-- _configure_openapi()
|-- Add workflow tags
+-- Register schema components
Route Structure¶
The plugin creates the following route structure:
{path_prefix}/ # Configured prefix (default: /workflows)
|
+-- api/ # Public API routes
| |-- definitions/ # Workflow definitions
| | |-- GET / # List definitions
| | |-- GET /{name} # Get definition by name
| | +-- GET /{name}/versions # List versions
| |
| |-- instances/ # Workflow instances
| | |-- POST / # Start new instance
| | |-- GET / # List instances (paginated)
| | |-- GET /{instance_id} # Get instance detail
| | |-- POST /{instance_id}/cancel # Cancel instance
| | |-- POST /{instance_id}/retry # Retry failed instance
| | +-- GET /{instance_id}/history # Get step history
| |
| +-- tasks/ # Human tasks
| |-- GET / # List my tasks
| |-- GET /{task_id} # Get task detail
| |-- POST /{task_id}/complete # Complete task
| |-- POST /{task_id}/claim # Claim unassigned task
| +-- POST /{task_id}/reassign # Reassign task
|
+-- admin/ # Admin API routes
| |-- GET /stats # Workflow statistics
| |-- GET /instances # All instances (admin view)
| |-- POST /definitions # Register new definition
| |-- DELETE /definitions/{name}/{ver} # Deactivate definition
| +-- POST /instances/{id}/force-complete # Force complete stuck workflow
|
+-- graphs/ # Graph visualization routes
|-- GET /definitions/{name} # Definition graph (Mermaid)
|-- GET /instances/{id} # Instance graph with state
+-- GET /instances/{id}/history # Animated execution history
Controller Hierarchy¶
Base Controller¶
All workflow controllers inherit from a base class that provides common functionality:
from litestar import Controller
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from litestar_workflows.engine.registry import WorkflowRegistry
from litestar_workflows.db import PersistentExecutionEngine
class BaseWorkflowController(Controller):
"""Base controller for workflow endpoints.
Provides common dependencies and helper methods for all workflow controllers.
"""
# Injected dependencies (configured via plugin)
engine: PersistentExecutionEngine
registry: WorkflowRegistry
def _get_user_id(self, request: Request) -> str | None:
"""Extract user ID from request.
Override this method to customize user extraction logic.
Default implementation checks request.user.id.
"""
if hasattr(request, "user") and request.user:
return getattr(request.user, "id", None)
return None
def _get_tenant_id(self, request: Request) -> str | None:
"""Extract tenant ID from request.
Override this method for multi-tenant applications.
Default implementation checks request.state.tenant_id.
"""
return getattr(request.state, "tenant_id", None)
WorkflowDefinitionController¶
Controller for workflow definition management:
from litestar import Controller, get
from litestar.params import Parameter
class WorkflowDefinitionController(Controller):
"""API for workflow definitions.
Provides endpoints for listing and retrieving workflow definitions
registered with the workflow registry.
"""
path = "/definitions"
tags = ["Workflow Definitions"]
@get(
"/",
summary="List workflow definitions",
description="Returns all registered workflow definitions.",
)
async def list_definitions(
self,
workflow_registry: WorkflowRegistry,
active_only: bool = Parameter(default=True, description="Filter to active definitions only"),
) -> list[WorkflowDefinitionDTO]:
"""List all registered workflow definitions."""
...
@get(
"/{name:str}",
summary="Get workflow definition",
description="Returns a specific workflow definition by name.",
)
async def get_definition(
self,
name: str,
workflow_registry: WorkflowRegistry,
version: str | None = Parameter(default=None, description="Specific version to retrieve"),
) -> WorkflowDefinitionDetailDTO:
"""Get a specific workflow definition."""
...
@get(
"/{name:str}/versions",
summary="List definition versions",
description="Returns all versions of a workflow definition.",
)
async def list_versions(
self,
name: str,
workflow_definition_repo: WorkflowDefinitionRepository,
) -> list[WorkflowDefinitionVersionDTO]:
"""List all versions of a workflow definition."""
...
WorkflowInstanceController¶
Controller for workflow instance management:
from litestar import Controller, get, post
from litestar.params import Parameter
from uuid import UUID
class WorkflowInstanceController(Controller):
"""API for workflow instances.
Provides endpoints for starting, monitoring, and controlling
workflow executions.
"""
path = "/instances"
tags = ["Workflow Instances"]
@post(
"/",
summary="Start workflow",
description="Starts a new workflow instance.",
status_code=201,
)
async def start_workflow(
self,
data: StartWorkflowDTO,
request: Request,
workflow_engine: PersistentExecutionEngine,
workflow_registry: WorkflowRegistry,
) -> WorkflowInstanceDTO:
"""Start a new workflow instance."""
user_id = self._get_user_id(request)
tenant_id = self._get_tenant_id(request)
workflow_class = workflow_registry.get_workflow_class(data.workflow_name)
instance = await workflow_engine.start_workflow(
workflow_class,
initial_data=data.initial_data,
tenant_id=tenant_id,
created_by=user_id,
)
return WorkflowInstanceDTO.from_instance(instance)
@get(
"/",
summary="List workflow instances",
description="Returns paginated list of workflow instances.",
)
async def list_instances(
self,
request: Request,
workflow_instance_repo: WorkflowInstanceRepository,
workflow_name: str | None = None,
status: WorkflowStatus | None = None,
limit: int = Parameter(default=50, le=100, ge=1),
offset: int = Parameter(default=0, ge=0),
) -> PaginatedResponse[WorkflowInstanceDTO]:
"""List workflow instances with filtering and pagination."""
...
@get(
"/{instance_id:uuid}",
summary="Get workflow instance",
description="Returns detailed information about a workflow instance.",
)
async def get_instance(
self,
instance_id: UUID,
workflow_engine: PersistentExecutionEngine,
) -> WorkflowInstanceDetailDTO:
"""Get detailed workflow instance information."""
...
@post(
"/{instance_id:uuid}/cancel",
summary="Cancel workflow",
description="Cancels a running workflow instance.",
)
async def cancel_instance(
self,
instance_id: UUID,
data: CancelWorkflowDTO,
workflow_engine: PersistentExecutionEngine,
) -> WorkflowInstanceDTO:
"""Cancel a running workflow instance."""
...
@post(
"/{instance_id:uuid}/retry",
summary="Retry workflow",
description="Retries a failed workflow from the failed step.",
)
async def retry_instance(
self,
instance_id: UUID,
data: RetryWorkflowDTO,
workflow_engine: PersistentExecutionEngine,
) -> WorkflowInstanceDTO:
"""Retry a failed workflow from a specific step."""
...
HumanTaskController¶
Controller for human task management:
from litestar import Controller, get, post
from uuid import UUID
class HumanTaskController(Controller):
"""API for human tasks.
Provides endpoints for listing, claiming, and completing
human approval tasks in workflows.
"""
path = "/tasks"
tags = ["Human Tasks"]
@get(
"/",
summary="List my tasks",
description="Returns pending tasks assigned to the current user.",
)
async def list_my_tasks(
self,
request: Request,
workflow_task_repo: HumanTaskRepository,
status: str = Parameter(default="pending"),
assignee_group: str | None = None,
) -> list[HumanTaskDTO]:
"""List human tasks assigned to current user."""
user_id = self._get_user_id(request)
tasks = await workflow_task_repo.find_pending(
assignee_id=user_id,
assignee_group=assignee_group,
)
return [HumanTaskDTO.from_model(t) for t in tasks]
@get(
"/{task_id:uuid}",
summary="Get task detail",
description="Returns detailed task information including form schema.",
)
async def get_task(
self,
task_id: UUID,
workflow_task_repo: HumanTaskRepository,
) -> HumanTaskDetailDTO:
"""Get human task details including form schema."""
...
@post(
"/{task_id:uuid}/complete",
summary="Complete task",
description="Completes a human task with form data.",
)
async def complete_task(
self,
task_id: UUID,
data: CompleteTaskDTO,
request: Request,
workflow_engine: PersistentExecutionEngine,
workflow_task_repo: HumanTaskRepository,
) -> WorkflowInstanceDTO:
"""Complete a human task with form data."""
user_id = self._get_user_id(request)
task = await workflow_task_repo.get(task_id)
# Validate task ownership
if task.assignee_id and task.assignee_id != user_id:
raise TaskNotAssignedError(task_id, user_id)
await workflow_engine.complete_human_task(
instance_id=task.instance_id,
step_name=task.step_name,
user_id=user_id,
data=data.form_data,
)
...
@post(
"/{task_id:uuid}/claim",
summary="Claim task",
description="Claims an unassigned task for the current user.",
)
async def claim_task(
self,
task_id: UUID,
request: Request,
workflow_task_repo: HumanTaskRepository,
) -> HumanTaskDTO:
"""Claim an unassigned task."""
...
@post(
"/{task_id:uuid}/reassign",
summary="Reassign task",
description="Reassigns a task to another user.",
)
async def reassign_task(
self,
task_id: UUID,
data: ReassignTaskDTO,
workflow_task_repo: HumanTaskRepository,
) -> HumanTaskDTO:
"""Reassign task to another user."""
...
DTO Layer Design¶
DTO Architecture¶
The DTO layer uses msgspec.Struct for high-performance serialization with
validation. DTOs are organized by domain and follow a consistent naming pattern:
*DTO: Base response/request DTO*DetailDTO: Extended DTO with additional fieldsCreate*DTO: Input DTO for creation operationsUpdate*DTO: Input DTO for update operations*ListDTO: Paginated list response wrapper
from msgspec import Struct, field
from typing import TypeVar, Generic
from datetime import datetime
from uuid import UUID
T = TypeVar("T")
class PaginatedResponse(Struct, Generic[T]):
"""Generic paginated response wrapper."""
items: list[T]
total: int
limit: int
offset: int
has_more: bool = field(default=False)
Request DTOs¶
Input DTOs for API operations:
from msgspec import Struct, field
from typing import Any
class StartWorkflowDTO(Struct):
"""DTO for starting a workflow instance.
Attributes:
workflow_name: Name of the workflow to start.
initial_data: Optional initial context data.
metadata: Optional metadata to attach to the instance.
"""
workflow_name: str
initial_data: dict[str, Any] | None = None
metadata: dict[str, Any] | None = None
class CancelWorkflowDTO(Struct):
"""DTO for canceling a workflow instance."""
reason: str = field(min_length=1, max_length=1000)
class RetryWorkflowDTO(Struct):
"""DTO for retrying a failed workflow."""
from_step: str | None = None # If None, retry from failed step
clear_error: bool = True
class CompleteTaskDTO(Struct):
"""DTO for completing a human task."""
form_data: dict[str, Any]
comment: str | None = None
class ReassignTaskDTO(Struct):
"""DTO for reassigning a human task."""
assignee_id: str | None = None
assignee_group: str | None = None
reason: str | None = None
Response DTOs¶
Output DTOs for API responses:
from msgspec import Struct, field
from datetime import datetime
from uuid import UUID
class WorkflowDefinitionDTO(Struct):
"""DTO for workflow definition summary."""
name: str
version: str
description: str
is_active: bool
step_count: int
initial_step: str
terminal_steps: list[str]
class WorkflowDefinitionDetailDTO(WorkflowDefinitionDTO):
"""DTO for detailed workflow definition."""
steps: list[StepDTO]
edges: list[EdgeDTO]
created_at: datetime
updated_at: datetime
class StepDTO(Struct):
"""DTO for workflow step."""
name: str
description: str
step_type: str
is_initial: bool = False
is_terminal: bool = False
class EdgeDTO(Struct):
"""DTO for workflow edge."""
source: str
target: str
condition: str | None = None
class WorkflowInstanceDTO(Struct):
"""DTO for workflow instance summary."""
id: UUID
workflow_name: str
workflow_version: str
status: str
current_step: str | None
started_at: datetime
completed_at: datetime | None
class WorkflowInstanceDetailDTO(WorkflowInstanceDTO):
"""DTO for detailed workflow instance."""
context_data: dict[str, Any]
metadata: dict[str, Any]
step_history: list[StepExecutionDTO]
error: str | None
created_by: str | None
tenant_id: str | None
class StepExecutionDTO(Struct):
"""DTO for step execution record."""
step_name: str
step_type: str
status: str
started_at: datetime
completed_at: datetime | None
output_data: dict[str, Any] | None
error: str | None
class HumanTaskDTO(Struct):
"""DTO for human task summary."""
id: UUID
instance_id: UUID
workflow_name: str
step_name: str
title: str
description: str | None
assignee_id: str | None
assignee_group: str | None
status: str
due_at: datetime | None
created_at: datetime
class HumanTaskDetailDTO(HumanTaskDTO):
"""DTO for detailed human task."""
form_schema: dict[str, Any] | None
workflow_context: dict[str, Any]
step_execution_id: UUID
Validation Strategy¶
DTOs implement validation through msgspec constraints and custom validators:
from msgspec import Struct, field, ValidationError
from typing import Annotated
class StartWorkflowDTO(Struct):
"""DTO with validation constraints."""
workflow_name: Annotated[str, field(min_length=1, max_length=255)]
initial_data: dict[str, Any] | None = None
def __post_init__(self) -> None:
"""Additional validation after initialization."""
if self.initial_data:
# Validate initial_data doesn't exceed size limit
import json
data_size = len(json.dumps(self.initial_data))
if data_size > 1_000_000: # 1MB limit
raise ValidationError("initial_data exceeds maximum size of 1MB")
Custom validation decorators for complex rules:
from functools import wraps
from litestar.exceptions import ValidationException
def validate_workflow_exists(func):
"""Decorator to validate workflow exists before operation."""
@wraps(func)
async def wrapper(
self,
workflow_name: str,
workflow_registry: WorkflowRegistry,
**kwargs
):
if not workflow_registry.has_definition(workflow_name):
raise ValidationException(
detail=f"Workflow '{workflow_name}' not found"
)
return await func(self, workflow_name, workflow_registry, **kwargs)
return wrapper
OpenAPI Integration¶
Schema Configuration¶
The plugin configures OpenAPI schema generation with comprehensive documentation:
from litestar.openapi.spec import Tag, ExternalDocumentation
WORKFLOW_OPENAPI_TAGS = [
Tag(
name="Workflow Definitions",
description="Endpoints for managing workflow definitions and schemas.",
external_docs=ExternalDocumentation(
url="https://docs.litestar-workflows.dev/concepts/workflows",
description="Workflow Concepts Guide",
),
),
Tag(
name="Workflow Instances",
description="Endpoints for starting, monitoring, and controlling workflow executions.",
external_docs=ExternalDocumentation(
url="https://docs.litestar-workflows.dev/guides/execution",
description="Execution Guide",
),
),
Tag(
name="Human Tasks",
description="Endpoints for managing human approval tasks and form submissions.",
external_docs=ExternalDocumentation(
url="https://docs.litestar-workflows.dev/guides/human-tasks",
description="Human Tasks Guide",
),
),
Tag(
name="Workflow Admin",
description="Administrative endpoints for workflow management (requires admin privileges).",
),
Tag(
name="Workflow Graphs",
description="Endpoints for workflow visualization using MermaidJS.",
),
]
def configure_openapi(app_config: AppConfig) -> None:
"""Configure OpenAPI schema for workflow endpoints."""
if app_config.openapi_config:
existing_tags = app_config.openapi_config.tags or []
app_config.openapi_config.tags = [*existing_tags, *WORKFLOW_OPENAPI_TAGS]
Response Examples¶
DTOs include OpenAPI examples for documentation:
from litestar.openapi.spec import Example
class WorkflowInstanceDTO(Struct):
"""DTO with OpenAPI examples."""
__openapi_examples__ = {
"running": Example(
summary="Running Instance",
description="A workflow instance currently executing",
value={
"id": "550e8400-e29b-41d4-a716-446655440000",
"workflow_name": "document_approval",
"workflow_version": "1.0.0",
"status": "running",
"current_step": "manager_review",
"started_at": "2024-11-24T10:30:00Z",
"completed_at": None,
},
),
"completed": Example(
summary="Completed Instance",
description="A successfully completed workflow instance",
value={
"id": "550e8400-e29b-41d4-a716-446655440001",
"workflow_name": "document_approval",
"workflow_version": "1.0.0",
"status": "completed",
"current_step": None,
"started_at": "2024-11-24T10:30:00Z",
"completed_at": "2024-11-24T11:45:00Z",
},
),
}
Guard Integration Patterns¶
Base Guard Protocol¶
Guards follow a consistent protocol for workflow authorization:
from litestar.connection import ASGIConnection
from litestar.handlers import BaseRouteHandler
from litestar.exceptions import NotAuthorizedException
class BaseWorkflowGuard:
"""Base protocol for workflow guards.
All workflow guards should implement this interface for consistent
authorization behavior across the API.
"""
async def __call__(
self,
connection: ASGIConnection,
route_handler: BaseRouteHandler,
) -> None:
"""Check authorization for the request.
Args:
connection: The ASGI connection.
route_handler: The route handler being accessed.
Raises:
NotAuthorizedException: If authorization fails.
"""
raise NotImplementedError
Authentication Guard¶
Guard requiring authenticated user:
class WorkflowAuthGuard(BaseWorkflowGuard):
"""Guard requiring authenticated user for workflow operations."""
async def __call__(
self,
connection: ASGIConnection,
route_handler: BaseRouteHandler,
) -> None:
if not connection.user:
raise NotAuthorizedException(
detail="Authentication required for workflow operations"
)
Admin Guard¶
Guard requiring admin privileges:
class WorkflowAdminGuard(BaseWorkflowGuard):
"""Guard requiring admin role for administrative operations."""
def __init__(self, admin_role: str = "workflow_admin") -> None:
self.admin_role = admin_role
async def __call__(
self,
connection: ASGIConnection,
route_handler: BaseRouteHandler,
) -> None:
if not connection.user:
raise NotAuthorizedException(detail="Authentication required")
roles = getattr(connection.user, "roles", [])
if self.admin_role not in roles:
raise NotAuthorizedException(
detail=f"Role '{self.admin_role}' required for admin operations"
)
Task Ownership Guard¶
Guard validating task assignment:
class TaskAssigneeGuard(BaseWorkflowGuard):
"""Guard ensuring user is assigned to the task.
This guard validates that the current user is either:
- Directly assigned to the task (assignee_id matches)
- Member of the assigned group (assignee_group matches user's groups)
- An admin (bypasses ownership check)
"""
def __init__(
self,
allow_unassigned: bool = True,
admin_role: str = "workflow_admin",
) -> None:
self.allow_unassigned = allow_unassigned
self.admin_role = admin_role
async def __call__(
self,
connection: ASGIConnection,
route_handler: BaseRouteHandler,
) -> None:
# Extract task_id from path
task_id = connection.path_params.get("task_id")
if not task_id:
return # Not a task-specific route
# Get task from database
task_repo = connection.app.state.task_repo
task = await task_repo.get(task_id)
if not task:
raise NotFoundException(detail=f"Task {task_id} not found")
user_id = getattr(connection.user, "id", None)
user_groups = getattr(connection.user, "groups", [])
user_roles = getattr(connection.user, "roles", [])
# Admin bypass
if self.admin_role in user_roles:
return
# Check assignment
is_assigned = (
task.assignee_id == user_id
or (task.assignee_group and task.assignee_group in user_groups)
or (self.allow_unassigned and not task.assignee_id and not task.assignee_group)
)
if not is_assigned:
raise NotAuthorizedException(
detail="You are not assigned to this task"
)
MermaidJS Graph Generation¶
Graph Service¶
Service for generating MermaidJS diagrams from workflow definitions and instances:
from litestar_workflows.core.definition import WorkflowDefinition
from litestar_workflows.core.types import StepType, StepStatus
class GraphService:
"""Service for generating workflow visualizations.
Generates MermaidJS diagrams for workflow definitions and instances
with support for execution state highlighting.
"""
# MermaidJS shape mappings by step type
STEP_SHAPES: dict[StepType, tuple[str, str]] = {
StepType.MACHINE: ("[", "]"), # Rectangle
StepType.HUMAN: ("{{", "}}"), # Hexagon
StepType.GATEWAY: ("{", "}"), # Diamond
StepType.TIMER: ("([", "])"), # Stadium
StepType.WEBHOOK: ("[[", "]]"), # Subroutine
}
# Status color mappings
STATUS_STYLES: dict[str, str] = {
"completed": "fill:#90EE90,stroke:#006400,stroke-width:2px",
"failed": "fill:#FFB6C1,stroke:#8B0000,stroke-width:2px",
"running": "fill:#FFD700,stroke:#FFA500,stroke-width:3px",
"waiting": "fill:#87CEEB,stroke:#4169E1,stroke-width:2px",
"skipped": "fill:#D3D3D3,stroke:#808080,stroke-width:1px,stroke-dasharray:5",
}
def generate_definition_graph(
self,
definition: WorkflowDefinition,
direction: str = "TD",
) -> str:
"""Generate MermaidJS graph for a workflow definition.
Args:
definition: The workflow definition to visualize.
direction: Graph direction (TD, LR, BT, RL).
Returns:
MermaidJS graph definition string.
"""
lines = [f"graph {direction}"]
# Add nodes
for step_name, step in definition.steps.items():
shape_start, shape_end = self.STEP_SHAPES.get(
step.step_type, ("[", "]")
)
# Format label
label = step_name.replace("_", " ").title()
# Add markers for special steps
if step_name == definition.initial_step:
label = f"[Start] {label}"
elif step_name in definition.terminal_steps:
label = f"{label} [End]"
lines.append(f" {step_name}{shape_start}{label}{shape_end}")
# Add edges
for edge in definition.edges:
source = edge.get_source_name()
target = edge.get_target_name()
if edge.condition:
# Conditional edge with label
condition_label = (
edge.condition if isinstance(edge.condition, str)
else "conditional"
)
lines.append(f" {source} -->|{condition_label}| {target}")
else:
lines.append(f" {source} --> {target}")
return "\n".join(lines)
def generate_instance_graph(
self,
definition: WorkflowDefinition,
current_step: str | None,
step_history: list[StepExecutionDTO],
direction: str = "TD",
) -> str:
"""Generate MermaidJS graph with execution state.
Args:
definition: The workflow definition.
current_step: Name of the currently executing step.
step_history: List of executed steps with status.
direction: Graph direction.
Returns:
MermaidJS graph with state highlighting.
"""
# Build base graph
base_graph = self.generate_definition_graph(definition, direction)
lines = base_graph.split("\n")
# Build step status map
step_statuses: dict[str, str] = {}
for execution in step_history:
status_key = "completed"
if execution.status == StepStatus.FAILED:
status_key = "failed"
elif execution.status == StepStatus.SKIPPED:
status_key = "skipped"
step_statuses[execution.step_name] = status_key
if current_step:
step_statuses[current_step] = "running"
# Add styling
for step_name, status in step_statuses.items():
if status in self.STATUS_STYLES:
lines.append(f" style {step_name} {self.STATUS_STYLES[status]}")
# Add click handlers for interactive graphs
lines.append("")
lines.append(" %% Click handlers for step details")
for step_name in definition.steps:
lines.append(f' click {step_name} "/workflows/steps/{step_name}"')
return "\n".join(lines)
Graph Controller¶
Controller for graph visualization endpoints:
from litestar import Controller, get
from litestar.response import Response
from uuid import UUID
class GraphController(Controller):
"""API for workflow graph visualization.
Provides MermaidJS graph representations of workflow definitions
and instances with execution state highlighting.
"""
path = "/graphs"
tags = ["Workflow Graphs"]
@get(
"/definitions/{name:str}",
summary="Get definition graph",
description="Returns MermaidJS graph for a workflow definition.",
)
async def get_definition_graph(
self,
name: str,
workflow_registry: WorkflowRegistry,
direction: str = "TD",
format: str = "mermaid",
) -> GraphResponseDTO:
"""Get workflow definition as MermaidJS graph."""
definition = workflow_registry.get_definition(name)
graph_service = GraphService()
graph = graph_service.generate_definition_graph(definition, direction)
return GraphResponseDTO(
graph=graph,
format=format,
workflow_name=name,
workflow_version=definition.version,
)
@get(
"/instances/{instance_id:uuid}",
summary="Get instance graph",
description="Returns MermaidJS graph with execution state.",
)
async def get_instance_graph(
self,
instance_id: UUID,
workflow_engine: PersistentExecutionEngine,
workflow_registry: WorkflowRegistry,
direction: str = "TD",
) -> GraphResponseDTO:
"""Get workflow instance graph with execution state."""
instance = await workflow_engine.get_instance(instance_id)
definition = workflow_registry.get_definition(instance.workflow_name)
graph_service = GraphService()
graph = graph_service.generate_instance_graph(
definition=definition,
current_step=instance.current_step,
step_history=[
StepExecutionDTO.from_model(s)
for s in instance.context.step_history
],
direction=direction,
)
return GraphResponseDTO(
graph=graph,
format="mermaid",
workflow_name=instance.workflow_name,
workflow_version=instance.workflow_version,
instance_id=instance_id,
instance_status=instance.status.value,
)
Graph Response DTO¶
class GraphResponseDTO(Struct):
"""DTO for graph visualization response."""
graph: str # MermaidJS graph definition
format: str # "mermaid" or "svg"
workflow_name: str
workflow_version: str
instance_id: UUID | None = None
instance_status: str | None = None
Dependency Injection Patterns¶
Repository Providers¶
DI providers for repository instances:
from typing import TYPE_CHECKING
from litestar.di import Provide
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession
from litestar_workflows.db import (
WorkflowDefinitionRepository,
WorkflowInstanceRepository,
HumanTaskRepository,
)
async def provide_definition_repo(
db_session: AsyncSession,
) -> WorkflowDefinitionRepository:
"""Provide workflow definition repository."""
return WorkflowDefinitionRepository(session=db_session)
async def provide_instance_repo(
db_session: AsyncSession,
) -> WorkflowInstanceRepository:
"""Provide workflow instance repository."""
return WorkflowInstanceRepository(session=db_session)
async def provide_task_repo(
db_session: AsyncSession,
) -> HumanTaskRepository:
"""Provide human task repository."""
return HumanTaskRepository(session=db_session)
Engine Provider¶
DI provider for the execution engine:
async def provide_engine(
db_session: AsyncSession,
workflow_registry: WorkflowRegistry,
) -> PersistentExecutionEngine:
"""Provide persistent execution engine.
The engine is created per-request with the request's database session
to ensure proper transaction management.
"""
return PersistentExecutionEngine(
registry=workflow_registry,
session=db_session,
)
Dependency Registration¶
The plugin registers dependencies during initialization:
def _register_dependencies(self, app_config: AppConfig) -> None:
"""Register dependency providers with the application."""
config = self._config
# Repository providers
app_config.dependencies[config.dependency_key_definition_repo] = Provide(
provide_definition_repo,
sync_to_thread=False,
)
app_config.dependencies[config.dependency_key_instance_repo] = Provide(
provide_instance_repo,
sync_to_thread=False,
)
app_config.dependencies[config.dependency_key_task_repo] = Provide(
provide_task_repo,
sync_to_thread=False,
)
# Engine provider
app_config.dependencies[config.dependency_key_engine] = Provide(
provide_engine,
sync_to_thread=False,
)
Dependency Composition¶
Request
|
+-- db_session (from SQLAlchemy plugin or user config)
| |
| +-- workflow_definition_repo
| +-- workflow_instance_repo
| +-- workflow_task_repo
| |
| +-- workflow_registry (singleton)
| |
| +-- workflow_engine (per-request)
Exception Handling¶
Web-Specific Exceptions¶
from litestar.exceptions import HTTPException
class WorkflowWebException(HTTPException):
"""Base exception for workflow web errors."""
pass
class WorkflowNotFoundException(WorkflowWebException):
"""Raised when a workflow or instance is not found."""
status_code = 404
def __init__(self, workflow_name: str) -> None:
super().__init__(detail=f"Workflow '{workflow_name}' not found")
class InstanceNotFoundException(WorkflowWebException):
"""Raised when a workflow instance is not found."""
status_code = 404
def __init__(self, instance_id: UUID) -> None:
super().__init__(detail=f"Workflow instance '{instance_id}' not found")
class TaskNotFoundException(WorkflowWebException):
"""Raised when a human task is not found."""
status_code = 404
def __init__(self, task_id: UUID) -> None:
super().__init__(detail=f"Task '{task_id}' not found")
class TaskNotAssignedError(WorkflowWebException):
"""Raised when user is not assigned to a task."""
status_code = 403
def __init__(self, task_id: UUID, user_id: str) -> None:
super().__init__(
detail=f"User '{user_id}' is not assigned to task '{task_id}'"
)
class InvalidTransitionError(WorkflowWebException):
"""Raised when an invalid workflow transition is attempted."""
status_code = 409
def __init__(self, detail: str) -> None:
super().__init__(detail=detail)
class WorkflowInProgressError(WorkflowWebException):
"""Raised when operation requires workflow to be complete."""
status_code = 409
def __init__(self, instance_id: UUID, current_status: str) -> None:
super().__init__(
detail=f"Workflow '{instance_id}' is still in progress (status: {current_status})"
)
Exception Handlers¶
from litestar import Request, Response
from litestar.exceptions import HTTPException
from litestar_workflows.exceptions import (
WorkflowNotFoundError,
StepExecutionError,
InvalidTransitionError as CoreInvalidTransitionError,
)
async def workflow_not_found_handler(
request: Request,
exc: WorkflowNotFoundError,
) -> Response:
"""Handle workflow not found errors."""
return Response(
content={"detail": str(exc), "type": "workflow_not_found"},
status_code=404,
)
async def step_execution_handler(
request: Request,
exc: StepExecutionError,
) -> Response:
"""Handle step execution errors."""
return Response(
content={
"detail": str(exc),
"type": "step_execution_error",
"step_name": exc.step_name if hasattr(exc, "step_name") else None,
},
status_code=500,
)
async def invalid_transition_handler(
request: Request,
exc: CoreInvalidTransitionError,
) -> Response:
"""Handle invalid workflow transition errors."""
return Response(
content={"detail": str(exc), "type": "invalid_transition"},
status_code=409,
)
WORKFLOW_EXCEPTION_HANDLERS = {
WorkflowNotFoundError: workflow_not_found_handler,
StepExecutionError: step_execution_handler,
CoreInvalidTransitionError: invalid_transition_handler,
}
Integration Example¶
Complete Application Setup¶
from litestar import Litestar
from litestar.plugins.sqlalchemy import SQLAlchemyPlugin
from litestar_workflows import WorkflowPlugin, WorkflowPluginConfig
from litestar_workflows.web import WorkflowWebPlugin, WorkflowWebPluginConfig
# Workflow definitions
from myapp.workflows import OrderApprovalWorkflow, DocumentReviewWorkflow
# Custom guards
from myapp.guards import AuthGuard, AdminGuard
# Create workflow plugin
workflow_plugin = WorkflowPlugin(
config=WorkflowPluginConfig(
auto_register_workflows=[
OrderApprovalWorkflow,
DocumentReviewWorkflow,
],
)
)
# Create web plugin
web_plugin = WorkflowWebPlugin(
config=WorkflowWebPluginConfig(
path_prefix="/api/workflows",
api_guards=[AuthGuard],
admin_guards=[AdminGuard],
task_guards=[AuthGuard],
enable_graph_api=True,
)
)
# Create Litestar application
app = Litestar(
route_handlers=[], # Your other routes
plugins=[
SQLAlchemyPlugin(config=...),
workflow_plugin,
web_plugin,
],
openapi_config=OpenAPIConfig(
title="My Application API",
version="1.0.0",
),
)
API Usage Examples¶
Starting a workflow:
curl -X POST http://localhost:8000/api/workflows/api/instances \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"workflow_name": "order_approval",
"initial_data": {
"order_id": "ORD-123",
"amount": 1500.00,
"customer_id": "CUST-456"
}
}'
Listing pending tasks:
curl http://localhost:8000/api/workflows/api/tasks \
-H "Authorization: Bearer $TOKEN"
Completing a task:
curl -X POST http://localhost:8000/api/workflows/api/tasks/{task_id}/complete \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"form_data": {
"approved": true,
"comments": "Approved per policy guidelines"
}
}'
Getting workflow graph:
curl http://localhost:8000/api/workflows/graphs/instances/{instance_id}
Testing Strategy¶
Unit Tests¶
import pytest
from litestar.testing import TestClient
from litestar_workflows.web import WorkflowWebPlugin
@pytest.fixture
def client(app):
return TestClient(app)
class TestWorkflowDefinitionController:
async def test_list_definitions(self, client):
response = client.get("/api/workflows/api/definitions")
assert response.status_code == 200
assert isinstance(response.json(), list)
async def test_get_definition_not_found(self, client):
response = client.get("/api/workflows/api/definitions/nonexistent")
assert response.status_code == 404
class TestWorkflowInstanceController:
async def test_start_workflow(self, client, auth_headers):
response = client.post(
"/api/workflows/api/instances",
json={"workflow_name": "test_workflow", "initial_data": {}},
headers=auth_headers,
)
assert response.status_code == 201
assert "id" in response.json()
class TestHumanTaskController:
async def test_complete_task_unauthorized(self, client, other_user_task_id):
response = client.post(
f"/api/workflows/api/tasks/{other_user_task_id}/complete",
json={"form_data": {"approved": True}},
)
assert response.status_code == 403
Integration Tests¶
@pytest.mark.integration
async def test_full_workflow_lifecycle(client, auth_headers):
# Start workflow
start_response = client.post(
"/api/workflows/api/instances",
json={"workflow_name": "approval_workflow", "initial_data": {"value": 100}},
headers=auth_headers,
)
instance_id = start_response.json()["id"]
# Wait for human task
import asyncio
await asyncio.sleep(0.1)
# Get pending tasks
tasks_response = client.get(
"/api/workflows/api/tasks",
headers=auth_headers,
)
assert len(tasks_response.json()) > 0
task_id = tasks_response.json()[0]["id"]
# Complete task
complete_response = client.post(
f"/api/workflows/api/tasks/{task_id}/complete",
json={"form_data": {"approved": True}},
headers=auth_headers,
)
assert complete_response.status_code == 200
# Verify workflow completed
instance_response = client.get(
f"/api/workflows/api/instances/{instance_id}",
headers=auth_headers,
)
assert instance_response.json()["status"] == "completed"
Implementation Checklist¶
Phase 3.1: Core Infrastructure¶
[ ] Create
web/module structure[ ] Implement
WorkflowWebPluginConfigdataclass[ ] Implement
WorkflowWebPluginclass withon_app_init[ ] Create base exception hierarchy
[ ] Register exception handlers
Phase 3.2: DTO Layer¶
[ ] Implement base DTO classes with msgspec
[ ] Create workflow definition DTOs
[ ] Create workflow instance DTOs
[ ] Create human task DTOs
[ ] Create graph response DTOs
[ ] Add validation constraints
[ ] Add OpenAPI examples
Phase 3.3: Controllers¶
[ ] Implement
WorkflowDefinitionController[ ] Implement
WorkflowInstanceController[ ] Implement
HumanTaskController[ ] Implement
WorkflowAdminController[ ] Implement
GraphController[ ] Add comprehensive docstrings for OpenAPI
Phase 3.4: Guards¶
[ ] Implement
BaseWorkflowGuardprotocol[ ] Implement
WorkflowAuthGuard[ ] Implement
WorkflowAdminGuard[ ] Implement
TaskAssigneeGuard[ ] Document guard customization
Phase 3.5: Graph Service¶
[ ] Implement
GraphServiceclass[ ] Add MermaidJS generation for definitions
[ ] Add state highlighting for instances
[ ] Support multiple graph directions
Phase 3.6: DI Integration¶
[ ] Create repository providers
[ ] Create engine provider
[ ] Implement dependency registration
[ ] Document DI customization
Phase 3.7: Testing¶
[ ] Unit tests for all controllers
[ ] Unit tests for DTOs
[ ] Unit tests for guards
[ ] Integration tests for full workflows
[ ] OpenAPI schema validation tests
Phase 3.8: Documentation¶
[ ] API usage guide
[ ] Guard customization guide
[ ] DTO customization guide
[ ] OpenAPI integration guide
Migration Guide¶
From Manual Routes¶
If you have existing manual routes, migrate to the plugin:
Before:
@post("/workflows/start")
async def start_workflow(data: dict, engine: Engine) -> dict:
instance = await engine.start_workflow(MyWorkflow, data)
return {"id": str(instance.id)}
After:
# Remove manual routes, use plugin
app = Litestar(
plugins=[WorkflowWebPlugin()],
)
# Access via: POST /workflows/api/instances
From v0.3.x¶
The web plugin replaces manual controller implementations:
Remove custom workflow controllers
Add
WorkflowWebPluginto your appConfigure guards via
WorkflowWebPluginConfigUpdate frontend to use new endpoint paths
See Also¶
Database Persistence - Database persistence setup
Execution - Execution engine concepts
Working with Human Tasks - Human task workflows
API Reference - Complete API reference