""" Versioned store with pluggable backends. Provides optimistic concurrency control via sequence numbers with support for different storage backends (Pydantic models, files, databases, etc.). """ import logging from abc import ABC, abstractmethod from copy import deepcopy from typing import Any, Generic, TypeVar from .context import get_execution_context from .types import CommitIntent logger = logging.getLogger(__name__) T = TypeVar("T") class StoreBackend(ABC, Generic[T]): """ Abstract backend for versioned stores. Allows different storage mechanisms (Pydantic models, files, databases) to be used with the same versioned store infrastructure. """ @abstractmethod def read(self) -> T: """ Read the current data. Returns: Current data in backend-specific format """ pass @abstractmethod def write(self, data: T) -> None: """ Write new data (replaces existing). Args: data: New data to write """ pass @abstractmethod def snapshot(self) -> T: """ Create an immutable snapshot of current data. Must return a deep copy or immutable version to prevent modifications from affecting the committed state. Returns: Immutable snapshot of data """ pass @abstractmethod def validate(self, data: T) -> bool: """ Validate that data is in correct format for this backend. Args: data: Data to validate Returns: True if valid Raises: ValueError: If invalid with explanation """ pass class PydanticStoreBackend(StoreBackend[T]): """ Backend for Pydantic BaseModel stores. Supports the existing OrderStore, ChartStore, etc. pattern. """ def __init__(self, model_instance: T): """ Initialize with a Pydantic model instance. Args: model_instance: Instance of a Pydantic BaseModel """ self._model = model_instance def read(self) -> T: return self._model def write(self, data: T) -> None: # Replace the internal model self._model = data def snapshot(self) -> T: # Use Pydantic's model_copy for deep copy if hasattr(self._model, "model_copy"): return self._model.model_copy(deep=True) # Fallback for older Pydantic or non-model types return deepcopy(self._model) def validate(self, data: T) -> bool: # Pydantic models validate themselves on construction # If we got here with a model instance, it's valid return True class FileStoreBackend(StoreBackend[str]): """ Backend for file-based storage. Future implementation for versioning files (e.g., Python scripts, configs). """ def __init__(self, file_path: str): self.file_path = file_path raise NotImplementedError("FileStoreBackend not yet implemented") def read(self) -> str: raise NotImplementedError() def write(self, data: str) -> None: raise NotImplementedError() def snapshot(self) -> str: raise NotImplementedError() def validate(self, data: str) -> bool: raise NotImplementedError() class DatabaseStoreBackend(StoreBackend[dict]): """ Backend for database table storage. Future implementation for versioning database interactions. """ def __init__(self, table_name: str, connection): self.table_name = table_name self.connection = connection raise NotImplementedError("DatabaseStoreBackend not yet implemented") def read(self) -> dict: raise NotImplementedError() def write(self, data: dict) -> None: raise NotImplementedError() def snapshot(self) -> dict: raise NotImplementedError() def validate(self, data: dict) -> bool: raise NotImplementedError() class VersionedStore(Generic[T]): """ Store with optimistic concurrency control via sequence numbers. Wraps any StoreBackend and provides: - Lock-free snapshot reads - Conflict detection on commit - Version tracking for debugging """ def __init__(self, name: str, backend: StoreBackend[T]): """ Initialize versioned store. Args: name: Unique name for this store (e.g., "OrderStore") backend: Backend implementation for storage """ self.name = name self._backend = backend self._committed_seq = 0 # Highest committed seq self._version = 0 # Increments on each commit (for debugging) @property def committed_seq(self) -> int: """Get the current committed sequence number""" return self._committed_seq @property def version(self) -> int: """Get the current version (increments on each commit)""" return self._version def read_snapshot(self) -> tuple[int, T]: """ Read an immutable snapshot of the store. This is lock-free and can be called concurrently. The snapshot captures the current committed seq and a deep copy of the data. Automatically records the snapshot seq in the execution context for conflict detection during commit. Returns: Tuple of (seq, snapshot_data) """ snapshot_seq = self._committed_seq snapshot_data = self._backend.snapshot() # Record in execution context for conflict detection ctx = get_execution_context() if ctx: ctx.record_snapshot(self.name, snapshot_seq) logger.debug( f"Store '{self.name}': read_snapshot() -> seq={snapshot_seq}, version={self._version}" ) return (snapshot_seq, snapshot_data) def read_current(self) -> T: """ Read the current data without snapshot tracking. Use this for read-only operations that don't need conflict detection. Returns: Current data (not a snapshot, modifications visible) """ return self._backend.read() def prepare_commit(self, expected_seq: int, new_data: T) -> CommitIntent: """ Create a commit intent for later sequential commit. Does NOT modify the store - that happens during the commit phase. Args: expected_seq: The seq of the snapshot that was read new_data: The new data to commit Returns: CommitIntent to be submitted to CommitCoordinator """ # Validate data before creating intent self._backend.validate(new_data) intent = CommitIntent( store_name=self.name, expected_seq=expected_seq, new_data=new_data, ) logger.debug( f"Store '{self.name}': prepare_commit(expected_seq={expected_seq}, current_seq={self._committed_seq})" ) return intent def commit(self, new_data: T, commit_seq: int) -> None: """ Commit new data at a specific seq. Called by CommitCoordinator during sequential commit phase. NOT for direct use by triggers. Args: new_data: Data to commit commit_seq: Seq number of this commit """ self._backend.write(new_data) self._committed_seq = commit_seq self._version += 1 logger.info( f"Store '{self.name}': committed seq={commit_seq}, version={self._version}" ) def check_conflict(self, expected_seq: int) -> bool: """ Check if committing at expected_seq would conflict. Args: expected_seq: The seq that was expected during execution Returns: True if conflict (committed_seq has advanced beyond expected_seq) """ has_conflict = self._committed_seq != expected_seq if has_conflict: logger.warning( f"Store '{self.name}': conflict detected - " f"expected_seq={expected_seq}, committed_seq={self._committed_seq}" ) return has_conflict def __repr__(self) -> str: return f"VersionedStore(name='{self.name}', committed_seq={self._committed_seq}, version={self._version})"