Source code for litestar_workflows.core.events

"""Domain events for workflow lifecycle.

This module defines the event types that are emitted during workflow execution.
These events can be used for logging, monitoring, triggering side effects, or
integrating with external systems.
"""

from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime
from typing import Any
from uuid import UUID

__all__ = [
    "HumanTaskCompleted",
    "HumanTaskCreated",
    "HumanTaskReassigned",
    "StepCompleted",
    "StepFailed",
    "StepSkipped",
    "StepStarted",
    "WorkflowCanceled",
    "WorkflowCompleted",
    "WorkflowEvent",
    "WorkflowFailed",
    "WorkflowPaused",
    "WorkflowResumed",
    "WorkflowStarted",
]


[docs] @dataclass class WorkflowEvent: """Base class for all workflow events. All workflow events include the instance_id and timestamp for tracking and correlation. Attributes: instance_id: Unique identifier of the workflow instance. timestamp: When the event occurred. """ instance_id: UUID timestamp: datetime
[docs] @dataclass class WorkflowStarted(WorkflowEvent): """Event emitted when a workflow instance starts execution. Attributes: instance_id: Unique identifier of the workflow instance. timestamp: When the workflow started. workflow_name: Name of the workflow definition. workflow_version: Version of the workflow definition. initial_data: Initial data provided when starting the workflow. user_id: Optional user who initiated the workflow. tenant_id: Optional tenant identifier for multi-tenancy. Example: >>> event = WorkflowStarted( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... workflow_name="approval_flow", ... workflow_version="1.0.0", ... initial_data={"document_id": "doc_123"}, ... ) """ workflow_name: str workflow_version: str initial_data: dict[str, Any] | None = None user_id: str | None = None tenant_id: str | None = None
[docs] @dataclass class WorkflowCompleted(WorkflowEvent): """Event emitted when a workflow instance completes successfully. Attributes: instance_id: Unique identifier of the workflow instance. timestamp: When the workflow completed. status: Final status of the workflow. final_step: Name of the final step that was executed. duration_seconds: Total execution time in seconds. Example: >>> event = WorkflowCompleted( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... status="COMPLETED", ... final_step="approve", ... duration_seconds=3600.5, ... ) """ status: str final_step: str | None = None duration_seconds: float | None = None
[docs] @dataclass class WorkflowFailed(WorkflowEvent): """Event emitted when a workflow instance fails. Attributes: instance_id: Unique identifier of the workflow instance. timestamp: When the workflow failed. error: Error message describing the failure. failed_step: Name of the step that caused the failure. error_type: Type/class of the error that occurred. stack_trace: Optional stack trace for debugging. Example: >>> event = WorkflowFailed( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... error="Database connection failed", ... failed_step="data_validation", ... error_type="ConnectionError", ... ) """ error: str failed_step: str | None = None error_type: str | None = None stack_trace: str | None = None
[docs] @dataclass class WorkflowCanceled(WorkflowEvent): """Event emitted when a workflow instance is manually canceled. Attributes: instance_id: Unique identifier of the workflow instance. timestamp: When the workflow was canceled. reason: Explanation for the cancellation. canceled_by: User who canceled the workflow. current_step: Step that was executing when canceled. Example: >>> event = WorkflowCanceled( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... reason="Request withdrawn", ... canceled_by="user_123", ... current_step="review", ... ) """ reason: str canceled_by: str | None = None current_step: str | None = None
[docs] @dataclass class WorkflowPaused(WorkflowEvent): """Event emitted when a workflow instance is paused. Attributes: instance_id: Unique identifier of the workflow instance. timestamp: When the workflow was paused. reason: Explanation for pausing. paused_at_step: Step where execution was paused. Example: >>> event = WorkflowPaused( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... reason="Awaiting external data", ... paused_at_step="data_ingestion", ... ) """ reason: str | None = None paused_at_step: str | None = None
[docs] @dataclass class WorkflowResumed(WorkflowEvent): """Event emitted when a paused workflow instance resumes. Attributes: instance_id: Unique identifier of the workflow instance. timestamp: When the workflow resumed. resumed_by: User who resumed the workflow. resuming_at_step: Step where execution will resume. Example: >>> event = WorkflowResumed( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... resumed_by="user_456", ... resuming_at_step="data_processing", ... ) """ resumed_by: str | None = None resuming_at_step: str | None = None
[docs] @dataclass class StepStarted(WorkflowEvent): """Event emitted when a step begins execution. Attributes: instance_id: Unique identifier of the workflow instance. timestamp: When the step started. step_name: Name of the step that started. step_type: Type of the step (MACHINE, HUMAN, etc.). input_data: Input data provided to the step. Example: >>> event = StepStarted( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="validation", ... step_type="MACHINE", ... input_data={"data": "to_validate"}, ... ) """ step_name: str step_type: str input_data: dict[str, Any] | None = None
[docs] @dataclass class StepCompleted(WorkflowEvent): """Event emitted when a step completes successfully. Attributes: instance_id: Unique identifier of the workflow instance. timestamp: When the step completed. step_name: Name of the step that completed. status: Final status of the step execution. result: Return value from the step execution. output_data: Output data produced by the step. duration_seconds: Execution time in seconds. Example: >>> event = StepCompleted( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="validation", ... status="SUCCEEDED", ... result={"valid": True}, ... duration_seconds=1.5, ... ) """ step_name: str status: str result: Any = None output_data: dict[str, Any] | None = None duration_seconds: float | None = None
[docs] @dataclass class StepFailed(WorkflowEvent): """Event emitted when a step fails execution. Attributes: instance_id: Unique identifier of the workflow instance. timestamp: When the step failed. step_name: Name of the step that failed. error: Error message describing the failure. error_type: Type/class of the error. retry_count: Number of retry attempts made. Example: >>> event = StepFailed( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="api_call", ... error="Connection timeout", ... error_type="TimeoutError", ... retry_count=3, ... ) """ step_name: str error: str error_type: str | None = None retry_count: int = 0
[docs] @dataclass class StepSkipped(WorkflowEvent): """Event emitted when a step is skipped due to conditions. Attributes: instance_id: Unique identifier of the workflow instance. timestamp: When the step was skipped. step_name: Name of the step that was skipped. reason: Explanation for why the step was skipped. Example: >>> event = StepSkipped( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="optional_notification", ... reason="Notification disabled in settings", ... ) """ step_name: str reason: str | None = None
[docs] @dataclass class HumanTaskCreated(WorkflowEvent): """Event emitted when a human task is created. Attributes: instance_id: Unique identifier of the workflow instance. timestamp: When the task was created. step_name: Name of the human step. task_id: Unique identifier for this task. assignee: User or group assigned to the task. title: Display title for the task. description: Detailed description of what needs to be done. due_at: Optional deadline for task completion. Example: >>> event = HumanTaskCreated( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="manager_approval", ... task_id=uuid4(), ... assignee="manager_group", ... title="Approve expense report", ... description="Review and approve expense report for project X", ... ) """ step_name: str task_id: UUID assignee: str | None = None title: str | None = None description: str | None = None due_at: datetime | None = None
[docs] @dataclass class HumanTaskCompleted(WorkflowEvent): """Event emitted when a human task is completed. Attributes: instance_id: Unique identifier of the workflow instance. timestamp: When the task was completed. step_name: Name of the human step. task_id: Unique identifier for this task. completed_by: User who completed the task. form_data: Data submitted by the user. comment: Optional comment provided by the user. Example: >>> event = HumanTaskCompleted( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="manager_approval", ... task_id=uuid4(), ... completed_by="manager_123", ... form_data={"approved": True, "amount": 1500.00}, ... comment="Approved for payment", ... ) """ step_name: str task_id: UUID completed_by: str form_data: dict[str, Any] | None = None comment: str | None = None
[docs] @dataclass class HumanTaskReassigned(WorkflowEvent): """Event emitted when a human task is reassigned. Attributes: instance_id: Unique identifier of the workflow instance. timestamp: When the task was reassigned. step_name: Name of the human step. task_id: Unique identifier for this task. from_assignee: Previous assignee. to_assignee: New assignee. reassigned_by: User who performed the reassignment. reason: Explanation for the reassignment. Example: >>> event = HumanTaskReassigned( ... instance_id=uuid4(), ... timestamp=datetime.utcnow(), ... step_name="review", ... task_id=uuid4(), ... from_assignee="user_123", ... to_assignee="user_456", ... reassigned_by="admin_789", ... reason="Original assignee on vacation", ... ) """ step_name: str task_id: UUID from_assignee: str | None = None to_assignee: str | None = None reassigned_by: str | None = None reason: str | None = None