Source code for litestar_workflows.db.models

"""SQLAlchemy models for workflow persistence.

This module defines the database models for persisting workflow state:
- WorkflowDefinitionModel: Stores workflow definition metadata and schema
- WorkflowInstanceModel: Stores running/completed workflow instances
- StepExecutionModel: Records individual step executions
- HumanTaskModel: Tracks pending human approval tasks
"""

from __future__ import annotations

from datetime import datetime
from typing import Any
from uuid import UUID

from advanced_alchemy.base import UUIDAuditBase
from sqlalchemy import JSON, DateTime, Enum, ForeignKey, Index, String, Text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column, relationship

from litestar_workflows.core.types import StepStatus, StepType, WorkflowStatus

__all__ = [
    "HumanTaskModel",
    "StepExecutionModel",
    "WorkflowDefinitionModel",
    "WorkflowInstanceModel",
]


# Cross-database JSON type: uses JSONB for PostgreSQL, JSON for others (SQLite, MySQL, etc.)
JSONType = JSON().with_variant(JSONB, "postgresql")


[docs] class WorkflowDefinitionModel(UUIDAuditBase): """Persisted workflow definition for versioning and storage. Stores the serialized workflow definition along with metadata for querying and managing workflow versions. Attributes: name: Unique name identifier for the workflow. version: Semantic version string (e.g., "1.0.0"). description: Human-readable description of the workflow. definition_json: Serialized WorkflowDefinition as JSON. is_active: Whether this version is active for new instances. instances: Related workflow instances. """ __tablename__ = "workflow_definitions" __table_args__ = ( Index("ix_workflow_definitions_name_version", "name", "version", unique=True), Index("ix_workflow_definitions_name_active", "name", "is_active"), ) name: Mapped[str] = mapped_column(String(255), index=True) version: Mapped[str] = mapped_column(String(50)) description: Mapped[str | None] = mapped_column(Text, nullable=True) definition_json: Mapped[dict[str, Any]] = mapped_column(JSONType, default=dict) is_active: Mapped[bool] = mapped_column(default=True) # Relationships instances: Mapped[list[WorkflowInstanceModel]] = relationship( back_populates="definition", lazy="noload", )
[docs] class WorkflowInstanceModel(UUIDAuditBase): """Persisted workflow instance representing a running or completed execution. Stores the current state of a workflow execution including context data, current step, and execution history. Attributes: definition_id: Foreign key to the workflow definition. workflow_name: Denormalized workflow name for quick queries. workflow_version: Denormalized workflow version. status: Current execution status. current_step: Name of the currently executing step (None if complete). context_data: Mutable workflow context data as JSON. metadata: Immutable metadata about the execution. error: Error message if workflow failed. started_at: Timestamp when execution began. completed_at: Timestamp when execution finished. tenant_id: Optional tenant identifier for multi-tenancy. created_by: Optional user who started the workflow. """ __tablename__ = "workflow_instances" __table_args__ = ( Index("ix_workflow_instances_status", "status"), Index("ix_workflow_instances_workflow_name", "workflow_name"), Index("ix_workflow_instances_tenant_id", "tenant_id"), Index("ix_workflow_instances_created_by", "created_by"), ) definition_id: Mapped[UUID] = mapped_column( ForeignKey("workflow_definitions.id", ondelete="CASCADE"), ) workflow_name: Mapped[str] = mapped_column(String(255)) workflow_version: Mapped[str] = mapped_column(String(50)) status: Mapped[WorkflowStatus] = mapped_column( Enum(WorkflowStatus, native_enum=False, length=50), default=WorkflowStatus.PENDING, ) current_step: Mapped[str | None] = mapped_column(String(255), nullable=True) context_data: Mapped[dict[str, Any]] = mapped_column(JSONType, default=dict) metadata_: Mapped[dict[str, Any]] = mapped_column( "metadata", JSONType, default=dict, ) error: Mapped[str | None] = mapped_column(Text, nullable=True) started_at: Mapped[datetime] = mapped_column(DateTime(timezone=True)) completed_at: Mapped[datetime | None] = mapped_column( DateTime(timezone=True), nullable=True, ) # Multi-tenancy support tenant_id: Mapped[str | None] = mapped_column(String(255), nullable=True) created_by: Mapped[str | None] = mapped_column(String(255), nullable=True) # Relationships definition: Mapped[WorkflowDefinitionModel] = relationship( back_populates="instances", lazy="joined", ) step_executions: Mapped[list[StepExecutionModel]] = relationship( back_populates="instance", lazy="noload", order_by="StepExecutionModel.started_at", ) human_tasks: Mapped[list[HumanTaskModel]] = relationship( back_populates="instance", lazy="noload", )
[docs] class StepExecutionModel(UUIDAuditBase): """Record of a single step execution within a workflow instance. Tracks the execution of each step including timing, status, and input/output data for debugging and audit purposes. Attributes: instance_id: Foreign key to the workflow instance. step_name: Name of the executed step. step_type: Type of step (MACHINE, HUMAN, etc.). status: Execution status of the step. input_data: Input data passed to the step. output_data: Output data produced by the step. error: Error message if step failed. started_at: Timestamp when step execution began. completed_at: Timestamp when step execution finished. assigned_to: User ID assigned to human tasks. completed_by: User ID who completed human tasks. """ __tablename__ = "workflow_step_executions" __table_args__ = ( Index("ix_step_executions_instance_id", "instance_id"), Index("ix_step_executions_step_name", "step_name"), Index("ix_step_executions_status", "status"), ) instance_id: Mapped[UUID] = mapped_column( ForeignKey("workflow_instances.id", ondelete="CASCADE"), ) step_name: Mapped[str] = mapped_column(String(255)) step_type: Mapped[StepType] = mapped_column( Enum(StepType, native_enum=False, length=50), ) status: Mapped[StepStatus] = mapped_column( Enum(StepStatus, native_enum=False, length=50), default=StepStatus.PENDING, ) input_data: Mapped[dict[str, Any] | None] = mapped_column(JSONType, nullable=True) output_data: Mapped[dict[str, Any] | None] = mapped_column(JSONType, nullable=True) error: Mapped[str | None] = mapped_column(Text, nullable=True) started_at: Mapped[datetime] = mapped_column(DateTime(timezone=True)) completed_at: Mapped[datetime | None] = mapped_column( DateTime(timezone=True), nullable=True, ) # For human tasks assigned_to: Mapped[str | None] = mapped_column(String(255), nullable=True) completed_by: Mapped[str | None] = mapped_column(String(255), nullable=True) # Relationships instance: Mapped[WorkflowInstanceModel] = relationship( back_populates="step_executions", )
[docs] class HumanTaskModel(UUIDAuditBase): """Pending human task for quick querying and assignment. Provides a denormalized view of pending human approval tasks for efficient querying by assignee, due date, and status. Attributes: instance_id: Foreign key to the workflow instance. step_execution_id: Foreign key to the step execution. step_name: Name of the human task step. title: Display title for the task. description: Detailed description of the task. form_schema: JSON Schema defining the task form. assignee_id: User ID assigned to complete the task. assignee_group: Group/role that can complete the task. due_at: Deadline for task completion. reminder_at: When to send a reminder. status: Current task status (PENDING, COMPLETED, CANCELED). completed_at: When the task was completed. completed_by: User who completed the task. """ __tablename__ = "workflow_human_tasks" __table_args__ = ( Index("ix_human_tasks_assignee_id", "assignee_id"), Index("ix_human_tasks_assignee_group", "assignee_group"), Index("ix_human_tasks_status", "status"), Index("ix_human_tasks_due_at", "due_at"), ) instance_id: Mapped[UUID] = mapped_column( ForeignKey("workflow_instances.id", ondelete="CASCADE"), ) step_execution_id: Mapped[UUID] = mapped_column( ForeignKey("workflow_step_executions.id", ondelete="CASCADE"), ) step_name: Mapped[str] = mapped_column(String(255)) title: Mapped[str] = mapped_column(String(500)) description: Mapped[str | None] = mapped_column(Text, nullable=True) form_schema: Mapped[dict[str, Any] | None] = mapped_column(JSONType, nullable=True) # Assignment assignee_id: Mapped[str | None] = mapped_column(String(255), nullable=True) assignee_group: Mapped[str | None] = mapped_column(String(255), nullable=True) # Deadlines due_at: Mapped[datetime | None] = mapped_column( DateTime(timezone=True), nullable=True, ) reminder_at: Mapped[datetime | None] = mapped_column( DateTime(timezone=True), nullable=True, ) # Status status: Mapped[str] = mapped_column(String(50), default="pending") completed_at: Mapped[datetime | None] = mapped_column( DateTime(timezone=True), nullable=True, ) completed_by: Mapped[str | None] = mapped_column(String(255), nullable=True) # Relationships instance: Mapped[WorkflowInstanceModel] = relationship( back_populates="human_tasks", )