Files
ai/backend/src/exchange_kernel/state.py
2026-03-02 18:34:38 -04:00

473 lines
15 KiB
Python

"""
State management for the Exchange Kernel.
Implements the storage and reconciliation logic for desired vs actual state.
This is the "Kubernetes for orders" concept - maintaining intent and continuously
reconciling reality to match intent.
"""
from abc import ABC, abstractmethod
from typing import Any
from collections import defaultdict
from .models import OrderIntent, OrderState, ReconciliationStatus
from ..schema.order_spec import Uint64
# ---------------------------------------------------------------------------
# Intent State Store - Desired State
# ---------------------------------------------------------------------------
class IntentStateStore(ABC):
"""
Storage for order intents (desired state).
This represents what the strategy kernel wants. Intents are durable and
persist across restarts. The reconciliation loop continuously works to
make actual state match these intents.
"""
@abstractmethod
async def create_intent(self, intent: OrderIntent) -> None:
"""
Store a new order intent.
Args:
intent: The order intent to store
Raises:
AlreadyExistsError: If intent_id already exists
"""
pass
@abstractmethod
async def get_intent(self, intent_id: str) -> OrderIntent:
"""
Retrieve an order intent.
Args:
intent_id: Intent ID to retrieve
Returns:
The order intent
Raises:
NotFoundError: If intent_id doesn't exist
"""
pass
@abstractmethod
async def update_intent(self, intent: OrderIntent) -> None:
"""
Update an existing order intent.
Args:
intent: Updated intent (intent_id must match existing)
Raises:
NotFoundError: If intent_id doesn't exist
"""
pass
@abstractmethod
async def delete_intent(self, intent_id: str) -> None:
"""
Delete an order intent.
Args:
intent_id: Intent ID to delete
Raises:
NotFoundError: If intent_id doesn't exist
"""
pass
@abstractmethod
async def list_intents(
self,
symbol_id: str | None = None,
group_id: str | None = None,
) -> list[OrderIntent]:
"""
List all order intents, optionally filtered.
Args:
symbol_id: Filter by symbol
group_id: Filter by OCO group
Returns:
List of matching intents
"""
pass
@abstractmethod
async def get_intents_by_group(self, group_id: str) -> list[OrderIntent]:
"""
Get all intents in an OCO group.
Args:
group_id: Group ID to query
Returns:
List of intents in the group
"""
pass
# ---------------------------------------------------------------------------
# Actual State Store - Current Reality
# ---------------------------------------------------------------------------
class ActualStateStore(ABC):
"""
Storage for actual order state (reality on exchange).
This represents what actually exists on the exchange right now.
Updated frequently from exchange feeds and order status queries.
"""
@abstractmethod
async def create_order_state(self, state: OrderState) -> None:
"""
Store a new order state.
Args:
state: The order state to store
Raises:
AlreadyExistsError: If order state for this intent_id already exists
"""
pass
@abstractmethod
async def get_order_state(self, intent_id: str) -> OrderState:
"""
Retrieve order state for an intent.
Args:
intent_id: Intent ID to query
Returns:
The current order state
Raises:
NotFoundError: If no state exists for this intent
"""
pass
@abstractmethod
async def get_order_state_by_exchange_id(self, exchange_order_id: str) -> OrderState:
"""
Retrieve order state by exchange order ID.
Useful for processing exchange callbacks that only provide exchange_order_id.
Args:
exchange_order_id: Exchange's order ID
Returns:
The order state
Raises:
NotFoundError: If no state exists for this exchange order ID
"""
pass
@abstractmethod
async def update_order_state(self, state: OrderState) -> None:
"""
Update an existing order state.
Args:
state: Updated state (intent_id must match existing)
Raises:
NotFoundError: If state doesn't exist
"""
pass
@abstractmethod
async def delete_order_state(self, intent_id: str) -> None:
"""
Delete an order state.
Args:
intent_id: Intent ID whose state to delete
Raises:
NotFoundError: If state doesn't exist
"""
pass
@abstractmethod
async def list_order_states(
self,
symbol_id: str | None = None,
reconciliation_status: ReconciliationStatus | None = None,
) -> list[OrderState]:
"""
List all order states, optionally filtered.
Args:
symbol_id: Filter by symbol
reconciliation_status: Filter by reconciliation status
Returns:
List of matching order states
"""
pass
@abstractmethod
async def get_stale_orders(self, max_age_seconds: int) -> list[OrderState]:
"""
Find orders that haven't been synced recently.
Used to identify orders that need status updates from exchange.
Args:
max_age_seconds: Maximum age since last sync
Returns:
List of order states that need refresh
"""
pass
# ---------------------------------------------------------------------------
# In-Memory Implementations (for testing/prototyping)
# ---------------------------------------------------------------------------
class InMemoryIntentStore(IntentStateStore):
"""Simple in-memory implementation of IntentStateStore"""
def __init__(self):
self._intents: dict[str, OrderIntent] = {}
self._by_symbol: dict[str, set[str]] = defaultdict(set)
self._by_group: dict[str, set[str]] = defaultdict(set)
async def create_intent(self, intent: OrderIntent) -> None:
if intent.intent_id in self._intents:
raise ValueError(f"Intent {intent.intent_id} already exists")
self._intents[intent.intent_id] = intent
self._by_symbol[intent.order.symbol_id].add(intent.intent_id)
if intent.group_id:
self._by_group[intent.group_id].add(intent.intent_id)
async def get_intent(self, intent_id: str) -> OrderIntent:
if intent_id not in self._intents:
raise KeyError(f"Intent {intent_id} not found")
return self._intents[intent_id]
async def update_intent(self, intent: OrderIntent) -> None:
if intent.intent_id not in self._intents:
raise KeyError(f"Intent {intent.intent_id} not found")
old_intent = self._intents[intent.intent_id]
# Update indices if symbol or group changed
if old_intent.order.symbol_id != intent.order.symbol_id:
self._by_symbol[old_intent.order.symbol_id].discard(intent.intent_id)
self._by_symbol[intent.order.symbol_id].add(intent.intent_id)
if old_intent.group_id != intent.group_id:
if old_intent.group_id:
self._by_group[old_intent.group_id].discard(intent.intent_id)
if intent.group_id:
self._by_group[intent.group_id].add(intent.intent_id)
self._intents[intent.intent_id] = intent
async def delete_intent(self, intent_id: str) -> None:
if intent_id not in self._intents:
raise KeyError(f"Intent {intent_id} not found")
intent = self._intents[intent_id]
self._by_symbol[intent.order.symbol_id].discard(intent_id)
if intent.group_id:
self._by_group[intent.group_id].discard(intent_id)
del self._intents[intent_id]
async def list_intents(
self,
symbol_id: str | None = None,
group_id: str | None = None,
) -> list[OrderIntent]:
if symbol_id and group_id:
# Intersection of both filters
symbol_ids = self._by_symbol.get(symbol_id, set())
group_ids = self._by_group.get(group_id, set())
intent_ids = symbol_ids & group_ids
elif symbol_id:
intent_ids = self._by_symbol.get(symbol_id, set())
elif group_id:
intent_ids = self._by_group.get(group_id, set())
else:
intent_ids = self._intents.keys()
return [self._intents[iid] for iid in intent_ids]
async def get_intents_by_group(self, group_id: str) -> list[OrderIntent]:
intent_ids = self._by_group.get(group_id, set())
return [self._intents[iid] for iid in intent_ids]
class InMemoryActualStateStore(ActualStateStore):
"""Simple in-memory implementation of ActualStateStore"""
def __init__(self):
self._states: dict[str, OrderState] = {}
self._by_exchange_id: dict[str, str] = {} # exchange_order_id -> intent_id
self._by_symbol: dict[str, set[str]] = defaultdict(set)
async def create_order_state(self, state: OrderState) -> None:
if state.intent_id in self._states:
raise ValueError(f"Order state for intent {state.intent_id} already exists")
self._states[state.intent_id] = state
self._by_exchange_id[state.exchange_order_id] = state.intent_id
self._by_symbol[state.status.order.symbol_id].add(state.intent_id)
async def get_order_state(self, intent_id: str) -> OrderState:
if intent_id not in self._states:
raise KeyError(f"Order state for intent {intent_id} not found")
return self._states[intent_id]
async def get_order_state_by_exchange_id(self, exchange_order_id: str) -> OrderState:
if exchange_order_id not in self._by_exchange_id:
raise KeyError(f"Order state for exchange order {exchange_order_id} not found")
intent_id = self._by_exchange_id[exchange_order_id]
return self._states[intent_id]
async def update_order_state(self, state: OrderState) -> None:
if state.intent_id not in self._states:
raise KeyError(f"Order state for intent {state.intent_id} not found")
old_state = self._states[state.intent_id]
# Update exchange_id index if it changed
if old_state.exchange_order_id != state.exchange_order_id:
del self._by_exchange_id[old_state.exchange_order_id]
self._by_exchange_id[state.exchange_order_id] = state.intent_id
# Update symbol index if it changed
old_symbol = old_state.status.order.symbol_id
new_symbol = state.status.order.symbol_id
if old_symbol != new_symbol:
self._by_symbol[old_symbol].discard(state.intent_id)
self._by_symbol[new_symbol].add(state.intent_id)
self._states[state.intent_id] = state
async def delete_order_state(self, intent_id: str) -> None:
if intent_id not in self._states:
raise KeyError(f"Order state for intent {intent_id} not found")
state = self._states[intent_id]
del self._by_exchange_id[state.exchange_order_id]
self._by_symbol[state.status.order.symbol_id].discard(intent_id)
del self._states[intent_id]
async def list_order_states(
self,
symbol_id: str | None = None,
reconciliation_status: ReconciliationStatus | None = None,
) -> list[OrderState]:
if symbol_id:
intent_ids = self._by_symbol.get(symbol_id, set())
states = [self._states[iid] for iid in intent_ids]
else:
states = list(self._states.values())
if reconciliation_status:
states = [s for s in states if s.reconciliation_status == reconciliation_status]
return states
async def get_stale_orders(self, max_age_seconds: int) -> list[OrderState]:
import time
current_time = int(time.time())
threshold = current_time - max_age_seconds
return [
state
for state in self._states.values()
if state.last_sync_at < threshold
]
# ---------------------------------------------------------------------------
# Reconciliation Engine (framework only, no implementation)
# ---------------------------------------------------------------------------
class ReconciliationEngine:
"""
Reconciliation engine that continuously works to make actual state match intent.
This is the heart of the "Kubernetes for orders" concept. It:
1. Compares desired state (intents) with actual state (exchange orders)
2. Computes necessary actions (place, modify, cancel)
3. Executes those actions via the exchange API
4. Handles retries, errors, and edge cases
This is a framework class - concrete implementations will be exchange-specific.
"""
def __init__(
self,
intent_store: IntentStateStore,
actual_store: ActualStateStore,
):
"""
Initialize the reconciliation engine.
Args:
intent_store: Store for desired state
actual_store: Store for actual state
"""
self.intent_store = intent_store
self.actual_store = actual_store
self._running = False
async def start(self) -> None:
"""Start the reconciliation loop"""
self._running = True
# Implementation would start async reconciliation loop here
pass
async def stop(self) -> None:
"""Stop the reconciliation loop"""
self._running = False
# Implementation would stop reconciliation loop here
pass
async def reconcile_intent(self, intent_id: str) -> None:
"""
Reconcile a specific intent.
Compares the intent with actual state and takes necessary actions.
Args:
intent_id: Intent to reconcile
"""
# Framework only - concrete implementation needed
pass
async def reconcile_all(self) -> None:
"""
Reconcile all intents.
Full reconciliation pass over all orders.
"""
# Framework only - concrete implementation needed
pass
def get_metrics(self) -> dict[str, Any]:
"""
Get reconciliation metrics.
Returns:
Metrics about reconciliation performance, errors, etc.
"""
return {
"running": self._running,
"reconciliation_lag_ms": 0, # Framework only
"pending_reconciliations": 0, # Framework only
"error_count": 0, # Framework only
"retry_count": 0, # Framework only
}