Source code for litestar_workflows.core.context

"""Workflow execution context.

This module provides the WorkflowContext dataclass which carries state and metadata
throughout workflow execution.
"""

from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime
from typing import Any
from uuid import UUID

__all__ = ["StepExecution", "WorkflowContext"]


[docs] @dataclass class StepExecution: """Record of a single step execution within a workflow. Attributes: step_name: Name of the executed step. status: Final status of the step execution. started_at: Timestamp when step execution began. completed_at: Timestamp when step execution finished (if completed). result: Return value from successful execution. error: Error message if execution failed. input_data: Input data passed to the step. output_data: Output data produced by the step. """ step_name: str status: str started_at: datetime completed_at: datetime | None = None result: Any = None error: str | None = None input_data: dict[str, Any] | None = None output_data: dict[str, Any] | None = None
[docs] @dataclass class WorkflowContext: """Execution context passed between workflow steps. The WorkflowContext carries all state, metadata, and execution history throughout a workflow's lifecycle. It provides a mutable data dictionary for step communication and immutable metadata for audit and tracking purposes. Attributes: workflow_id: Unique identifier for the workflow definition. instance_id: Unique identifier for this workflow instance. data: Mutable state dictionary for inter-step communication. metadata: Immutable metadata about the workflow execution. current_step: Name of the currently executing step. step_history: Chronological record of all step executions. started_at: Timestamp when the workflow instance was created. user_id: Optional user identifier for human task contexts. tenant_id: Optional tenant identifier for multi-tenancy support. Example: >>> from uuid import uuid4 >>> from datetime import datetime >>> context = WorkflowContext( ... workflow_id=uuid4(), ... instance_id=uuid4(), ... data={"input": "value"}, ... metadata={"creator": "user123"}, ... current_step="initial_step", ... step_history=[], ... started_at=datetime.utcnow(), ... ) >>> context.set("result", 42) >>> context.get("result") 42 """ workflow_id: UUID instance_id: UUID data: dict[str, Any] metadata: dict[str, Any] current_step: str step_history: list[StepExecution] started_at: datetime user_id: str | None = None tenant_id: str | None = None
[docs] def get(self, key: str, default: Any = None) -> Any: """Retrieve a value from the workflow data dictionary. Args: key: The key to look up in the data dictionary. default: Default value to return if key is not found. Returns: The value associated with the key, or the default if not present. Example: >>> context.get("some_key", "default_value") 'default_value' """ return self.data.get(key, default)
[docs] def set(self, key: str, value: Any) -> None: """Set a value in the workflow data dictionary. Args: key: The key to set in the data dictionary. value: The value to associate with the key. Example: >>> context.set("status", "approved") >>> context.get("status") 'approved' """ self.data[key] = value
[docs] def with_step(self, step_name: str) -> WorkflowContext: """Create a new context for the given step execution. This method returns a shallow copy of the context with the current_step updated to the provided step name. The data and metadata dictionaries are shared with the original context. Args: step_name: Name of the step to execute next. Returns: A new WorkflowContext instance with updated current_step. Example: >>> new_context = context.with_step("next_step") >>> new_context.current_step 'next_step' """ return WorkflowContext( workflow_id=self.workflow_id, instance_id=self.instance_id, data=self.data, metadata=self.metadata, current_step=step_name, step_history=self.step_history, started_at=self.started_at, user_id=self.user_id, tenant_id=self.tenant_id, )
[docs] def get_last_execution(self, step_name: str | None = None) -> StepExecution | None: """Get the most recent execution record for a step. Args: step_name: Optional step name to filter by. If None, returns the last execution regardless of step. Returns: The most recent StepExecution matching the criteria, or None if not found. Example: >>> last_exec = context.get_last_execution("approval_step") >>> if last_exec and last_exec.status == "SUCCEEDED": ... print("Step succeeded") """ if step_name is None: return self.step_history[-1] if self.step_history else None for execution in reversed(self.step_history): if execution.step_name == step_name: return execution return None
[docs] def has_step_executed(self, step_name: str) -> bool: """Check if a step has been executed in this workflow instance. Args: step_name: Name of the step to check. Returns: True if the step appears in the execution history, False otherwise. Example: >>> if context.has_step_executed("validation"): ... print("Validation already completed") """ return any(step_exec.step_name == step_name for step_exec in self.step_history)