Source code for litestar_workflows.engine.registry

"""Workflow registry for managing workflow definitions.

This module provides a registry for storing, retrieving, and managing
workflow definitions with support for versioning.
"""

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from litestar_workflows.core.definition import WorkflowDefinition
    from litestar_workflows.core.protocols import Workflow

__all__ = ["WorkflowRegistry"]


[docs] class WorkflowRegistry: """Registry for storing and retrieving workflow definitions. The registry maintains a mapping of workflow names to versions and their definitions, enabling workflow lookup and version management. Attributes: _definitions: Nested dict mapping name -> version -> WorkflowDefinition. _workflow_classes: Map of workflow names to their class definitions. """
[docs] def __init__(self) -> None: """Initialize an empty workflow registry.""" self._definitions: dict[str, dict[str, WorkflowDefinition]] = {} self._workflow_classes: dict[str, type[Workflow]] = {}
[docs] def register(self, workflow_class: type[Workflow]) -> None: """Register a workflow class with the registry. Extracts the workflow definition from the class and stores it indexed by name and version. Args: workflow_class: The workflow class to register. Example: >>> registry = WorkflowRegistry() >>> registry.register(MyWorkflow) """ definition = workflow_class.get_definition() # Initialize version dict if needed if definition.name not in self._definitions: self._definitions[definition.name] = {} # Store the definition self._definitions[definition.name][definition.version] = definition # Store the workflow class self._workflow_classes[definition.name] = workflow_class
[docs] def get_definition( self, name: str, version: str | None = None, ) -> WorkflowDefinition: """Retrieve a workflow definition by name and optional version. Args: name: The workflow name. version: The workflow version. If None, returns the latest version. Returns: The WorkflowDefinition for the requested workflow. Raises: KeyError: If the workflow name or version is not found. Example: >>> definition = registry.get_definition("approval_workflow") >>> definition_v1 = registry.get_definition("approval_workflow", "1.0.0") """ if name not in self._definitions: msg = f"Workflow '{name}' not found in registry" raise KeyError(msg) versions = self._definitions[name] if version is None: # Return the latest version (highest semantic version or last registered) version = max(versions.keys()) if version not in versions: available = ", ".join(versions.keys()) msg = f"Version '{version}' not found for workflow '{name}'. Available versions: {available}" raise KeyError(msg) return versions[version]
[docs] def get_workflow_class(self, name: str) -> type[Workflow]: """Retrieve the workflow class by name. Args: name: The workflow name. Returns: The Workflow class. Raises: KeyError: If the workflow name is not found. Example: >>> WorkflowClass = registry.get_workflow_class("approval_workflow") >>> instance = await engine.start_workflow(WorkflowClass) """ if name not in self._workflow_classes: msg = f"Workflow class '{name}' not found in registry" raise KeyError(msg) return self._workflow_classes[name]
[docs] def list_definitions(self, active_only: bool = True) -> list[WorkflowDefinition]: """List all registered workflow definitions. Args: active_only: If True, only return the latest version of each workflow. If False, return all versions. Returns: List of WorkflowDefinition objects. Example: >>> all_workflows = registry.list_definitions() >>> all_versions = registry.list_definitions(active_only=False) """ definitions = [] for versions in self._definitions.values(): if active_only: # Only include the latest version latest_version = max(versions.keys()) definitions.append(versions[latest_version]) else: # Include all versions definitions.extend(versions.values()) return definitions
[docs] def unregister(self, name: str, version: str | None = None) -> None: """Remove a workflow from the registry. Args: name: The workflow name. version: The specific version to remove. If None, removes all versions. Example: >>> registry.unregister("old_workflow") >>> registry.unregister("approval_workflow", "1.0.0") """ if name not in self._definitions: return if version is None: # Remove all versions del self._definitions[name] if name in self._workflow_classes: del self._workflow_classes[name] else: # Remove specific version if version in self._definitions[name]: del self._definitions[name][version] # If no versions left, remove the workflow class too if not self._definitions[name]: del self._definitions[name] if name in self._workflow_classes: del self._workflow_classes[name]
[docs] def has_workflow(self, name: str, version: str | None = None) -> bool: """Check if a workflow exists in the registry. Args: name: The workflow name. version: Optional specific version to check. Returns: True if the workflow exists, False otherwise. Example: >>> if registry.has_workflow("approval_workflow"): ... definition = registry.get_definition("approval_workflow") """ if name not in self._definitions: return False if version is None: return True return version in self._definitions[name]
[docs] def get_versions(self, name: str) -> list[str]: """Get all versions for a workflow. Args: name: The workflow name. Returns: List of version strings. Raises: KeyError: If the workflow name is not found. Example: >>> versions = registry.get_versions("approval_workflow") >>> print(versions) # ['1.0.0', '1.1.0', '2.0.0'] """ if name not in self._definitions: msg = f"Workflow '{name}' not found in registry" raise KeyError(msg) return list(self._definitions[name].keys())