1305 lines
40 KiB
Markdown
1305 lines
40 KiB
Markdown
# User Container Event System - Implementation Guide
|
|
|
|
This document provides detailed implementation guidance for the dual-channel event system between user containers and the gateway. See `doc/protocol.md` for the protocol specification.
|
|
|
|
## Overview
|
|
|
|
User containers emit events that must reach users through various channels:
|
|
- **Active session** (WebSocket, Telegram webhook)
|
|
- **External delivery** (Telegram API, email, push notifications)
|
|
|
|
Two ZMQ patterns handle different delivery requirements:
|
|
1. **XPUB/SUB**: Fast path for active sessions (fire-and-forget)
|
|
2. **DEALER/ROUTER**: Guaranteed delivery for critical events (with ack)
|
|
|
|
## Architecture Decision: Why Two Channels?
|
|
|
|
**Problem**: A single pattern can't satisfy both requirements:
|
|
- PUB/SUB drops messages if no subscriber (unacceptable for order confirmations)
|
|
- DEALER/ROUTER adds latency and requires ack handling (overkill for chart updates)
|
|
|
|
**Solution**: Container decides routing based on:
|
|
1. Event priority (INFORMATIONAL vs CRITICAL)
|
|
2. Whether an active session exists (tracked via XPUB subscriptions)
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────┐
|
|
│ Decision Tree │
|
|
│ │
|
|
│ Event arrives │
|
|
│ │ │
|
|
│ ▼ │
|
|
│ ┌───────────────────┐ │
|
|
│ │ Priority == │ │
|
|
│ │ INFORMATIONAL? │ │
|
|
│ └─────────┬─────────┘ │
|
|
│ Yes │ No │
|
|
│ │ │ │
|
|
│ │ ▼ │
|
|
│ │ ┌───────────────────┐ │
|
|
│ │ │ has_active_ │ │
|
|
│ │ │ subscriber()? │ │
|
|
│ │ └─────────┬─────────┘ │
|
|
│ │ Yes │ No │
|
|
│ │ │ │ │
|
|
│ ▼ ▼ ▼ │
|
|
│ ┌─────────────┐ ┌─────────────┐ │
|
|
│ │ XPUB │ │ DEALER │ │
|
|
│ │ (fast path) │ │ (guaranteed)│ │
|
|
│ └─────────────┘ └─────────────┘ │
|
|
└─────────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
## Container Implementation (Python)
|
|
|
|
### File Structure
|
|
|
|
```
|
|
client-py/dexorder/
|
|
├── events/
|
|
│ ├── __init__.py
|
|
│ ├── publisher.py # EventPublisher class
|
|
│ ├── types.py # UserEvent, DeliverySpec, etc.
|
|
│ └── pending_store.py # Disk persistence for critical events
|
|
```
|
|
|
|
### Event Publisher Class
|
|
|
|
```python
|
|
# client-py/dexorder/events/publisher.py
|
|
|
|
import asyncio
|
|
import time
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
import zmq
|
|
import zmq.asyncio
|
|
|
|
from .types import UserEvent, Priority, EventAck, AckStatus
|
|
from .pending_store import PendingStore
|
|
|
|
|
|
@dataclass
|
|
class PendingEvent:
|
|
event: UserEvent
|
|
sent_at: float
|
|
retries: int = 0
|
|
|
|
|
|
class EventPublisher:
|
|
"""
|
|
Publishes user events via dual ZMQ channels:
|
|
- XPUB for informational/active-session events
|
|
- DEALER for critical/guaranteed-delivery events
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
user_id: str,
|
|
xpub_port: int = 5570,
|
|
gateway_router_endpoint: str = "tcp://gateway:5571",
|
|
ack_timeout: float = 30.0,
|
|
max_retries: int = 3,
|
|
pending_store_path: Optional[str] = None,
|
|
):
|
|
self.user_id = user_id
|
|
self.xpub_port = xpub_port
|
|
self.gateway_router_endpoint = gateway_router_endpoint
|
|
self.ack_timeout = ack_timeout
|
|
self.max_retries = max_retries
|
|
|
|
self.ctx = zmq.asyncio.Context()
|
|
|
|
# XPUB socket for informational events
|
|
self.xpub_socket = self.ctx.socket(zmq.XPUB)
|
|
self.xpub_socket.setsockopt(zmq.XPUB_VERBOSE, 1)
|
|
self.xpub_socket.bind(f"tcp://*:{xpub_port}")
|
|
|
|
# DEALER socket for critical events
|
|
self.dealer_socket = self.ctx.socket(zmq.DEALER)
|
|
self.dealer_socket.setsockopt_string(zmq.IDENTITY, f"container-{user_id}")
|
|
self.dealer_socket.connect(gateway_router_endpoint)
|
|
|
|
# Track active subscriptions (gateway sessions)
|
|
self.active_subscriptions: set[str] = set()
|
|
|
|
# Track pending critical events awaiting ack
|
|
self.pending_events: dict[str, PendingEvent] = {}
|
|
|
|
# Persistent store for crash recovery
|
|
self.pending_store = PendingStore(pending_store_path) if pending_store_path else None
|
|
|
|
# Background tasks
|
|
self._subscription_task: Optional[asyncio.Task] = None
|
|
self._ack_task: Optional[asyncio.Task] = None
|
|
self._retry_task: Optional[asyncio.Task] = None
|
|
self._running = False
|
|
|
|
async def start(self):
|
|
"""Start background tasks for subscription tracking and ack handling."""
|
|
self._running = True
|
|
|
|
# Reload any persisted pending events
|
|
if self.pending_store:
|
|
for event in await self.pending_store.load_pending():
|
|
self.pending_events[event.event_id] = PendingEvent(event, time.time())
|
|
|
|
self._subscription_task = asyncio.create_task(self._subscription_loop())
|
|
self._ack_task = asyncio.create_task(self._ack_loop())
|
|
self._retry_task = asyncio.create_task(self._retry_loop())
|
|
|
|
async def stop(self):
|
|
"""Stop background tasks and persist pending events."""
|
|
self._running = False
|
|
|
|
# Cancel background tasks
|
|
for task in [self._subscription_task, self._ack_task, self._retry_task]:
|
|
if task:
|
|
task.cancel()
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
# Persist pending critical events
|
|
if self.pending_store and self.pending_events:
|
|
await self.pending_store.save_pending(
|
|
[pe.event for pe in self.pending_events.values()]
|
|
)
|
|
|
|
# Close sockets
|
|
self.xpub_socket.close()
|
|
self.dealer_socket.close()
|
|
self.ctx.term()
|
|
|
|
def has_active_subscriber(self) -> bool:
|
|
"""Check if any gateway is subscribed to this user's events."""
|
|
topic = f"USER:{self.user_id}"
|
|
return topic in self.active_subscriptions
|
|
|
|
async def publish(self, event: UserEvent):
|
|
"""
|
|
Publish event via appropriate channel based on priority and session state.
|
|
"""
|
|
# Ensure event has required fields
|
|
if not event.event_id:
|
|
event.event_id = str(uuid.uuid4())
|
|
if not event.user_id:
|
|
event.user_id = self.user_id
|
|
if not event.timestamp:
|
|
event.timestamp = int(time.time() * 1000)
|
|
|
|
topic = f"USER:{self.user_id}"
|
|
priority = event.delivery.priority if event.delivery else Priority.NORMAL
|
|
|
|
if priority == Priority.INFORMATIONAL:
|
|
# Fire and forget - only send if someone's listening
|
|
if self.has_active_subscriber():
|
|
await self._send_xpub(topic, event)
|
|
# else: silently drop (by design)
|
|
|
|
elif self.has_active_subscriber():
|
|
# Active session exists - use fast path
|
|
await self._send_xpub(topic, event)
|
|
|
|
else:
|
|
# No active session - use guaranteed delivery
|
|
await self._send_dealer(event)
|
|
|
|
async def _send_xpub(self, topic: str, event: UserEvent):
|
|
"""Send event via XPUB (fire-and-forget)."""
|
|
payload = event.serialize() # Protobuf serialization
|
|
await self.xpub_socket.send_multipart([topic.encode(), payload])
|
|
|
|
async def _send_dealer(self, event: UserEvent):
|
|
"""Send event via DEALER (with ack tracking)."""
|
|
self.pending_events[event.event_id] = PendingEvent(
|
|
event=event,
|
|
sent_at=time.time(),
|
|
retries=0,
|
|
)
|
|
|
|
payload = event.serialize()
|
|
await self.dealer_socket.send(payload)
|
|
|
|
async def _subscription_loop(self):
|
|
"""Process XPUB subscription/unsubscription messages."""
|
|
while self._running:
|
|
try:
|
|
if await self.xpub_socket.poll(100):
|
|
msg = await self.xpub_socket.recv()
|
|
# First byte: 1 = subscribe, 0 = unsubscribe
|
|
# Remaining bytes: topic
|
|
is_subscribe = msg[0] == 1
|
|
topic = msg[1:].decode()
|
|
|
|
if is_subscribe:
|
|
self.active_subscriptions.add(topic)
|
|
else:
|
|
self.active_subscriptions.discard(topic)
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
# Log error but continue
|
|
print(f"Subscription loop error: {e}")
|
|
|
|
async def _ack_loop(self):
|
|
"""Process EventAck messages from gateway."""
|
|
while self._running:
|
|
try:
|
|
if await self.dealer_socket.poll(100):
|
|
payload = await self.dealer_socket.recv()
|
|
ack = EventAck.deserialize(payload)
|
|
|
|
if ack.event_id in self.pending_events:
|
|
pending = self.pending_events.pop(ack.event_id)
|
|
|
|
if ack.status == AckStatus.ERROR:
|
|
# Gateway couldn't deliver - log for now
|
|
# Could implement escalation logic here
|
|
print(f"Event {ack.event_id} delivery failed: {ack.error_message}")
|
|
# else: DELIVERED or QUEUED - we're done
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
print(f"Ack loop error: {e}")
|
|
|
|
async def _retry_loop(self):
|
|
"""Retry pending events that haven't been acked."""
|
|
while self._running:
|
|
try:
|
|
await asyncio.sleep(5) # Check every 5 seconds
|
|
|
|
now = time.time()
|
|
for event_id, pending in list(self.pending_events.items()):
|
|
if now - pending.sent_at > self.ack_timeout:
|
|
if pending.retries >= self.max_retries:
|
|
# Give up - log and remove
|
|
print(f"Event {event_id} exceeded max retries, dropping")
|
|
del self.pending_events[event_id]
|
|
else:
|
|
# Retry
|
|
pending.retries += 1
|
|
pending.sent_at = now
|
|
payload = pending.event.serialize()
|
|
await self.dealer_socket.send(payload)
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
print(f"Retry loop error: {e}")
|
|
```
|
|
|
|
### Event Types
|
|
|
|
```python
|
|
# client-py/dexorder/events/types.py
|
|
|
|
from dataclasses import dataclass, field
|
|
from enum import IntEnum
|
|
from typing import Optional
|
|
import struct
|
|
|
|
# Message type IDs (match protocol.md)
|
|
MSG_TYPE_USER_EVENT = 0x20
|
|
MSG_TYPE_EVENT_ACK = 0x21
|
|
|
|
|
|
class Priority(IntEnum):
|
|
INFORMATIONAL = 0 # Drop if no active session
|
|
NORMAL = 1 # Best effort
|
|
CRITICAL = 2 # Must deliver
|
|
|
|
|
|
class ChannelType(IntEnum):
|
|
ACTIVE_SESSION = 0
|
|
WEB = 1
|
|
TELEGRAM = 2
|
|
EMAIL = 3
|
|
PUSH = 4
|
|
|
|
|
|
class EventType(IntEnum):
|
|
ORDER_PLACED = 0
|
|
ORDER_FILLED = 1
|
|
ORDER_CANCELLED = 2
|
|
ALERT_TRIGGERED = 3
|
|
POSITION_UPDATED = 4
|
|
WORKSPACE_CHANGED = 5
|
|
STRATEGY_LOG = 6
|
|
|
|
|
|
class AckStatus(IntEnum):
|
|
DELIVERED = 0
|
|
QUEUED = 1
|
|
ERROR = 2
|
|
|
|
|
|
@dataclass
|
|
class ChannelPreference:
|
|
channel: ChannelType
|
|
only_if_active: bool = False
|
|
|
|
|
|
@dataclass
|
|
class DeliverySpec:
|
|
priority: Priority = Priority.NORMAL
|
|
channels: list[ChannelPreference] = field(default_factory=list)
|
|
|
|
@staticmethod
|
|
def informational() -> "DeliverySpec":
|
|
"""Drop if no active session."""
|
|
return DeliverySpec(
|
|
priority=Priority.INFORMATIONAL,
|
|
channels=[ChannelPreference(ChannelType.ACTIVE_SESSION, only_if_active=True)]
|
|
)
|
|
|
|
@staticmethod
|
|
def active_or_telegram() -> "DeliverySpec":
|
|
"""Active session preferred, fallback to Telegram."""
|
|
return DeliverySpec(
|
|
priority=Priority.NORMAL,
|
|
channels=[
|
|
ChannelPreference(ChannelType.ACTIVE_SESSION, only_if_active=True),
|
|
ChannelPreference(ChannelType.TELEGRAM, only_if_active=False),
|
|
]
|
|
)
|
|
|
|
@staticmethod
|
|
def critical() -> "DeliverySpec":
|
|
"""Must deliver through any available channel."""
|
|
return DeliverySpec(
|
|
priority=Priority.CRITICAL,
|
|
channels=[
|
|
ChannelPreference(ChannelType.ACTIVE_SESSION, only_if_active=True),
|
|
ChannelPreference(ChannelType.TELEGRAM, only_if_active=False),
|
|
ChannelPreference(ChannelType.PUSH, only_if_active=False),
|
|
ChannelPreference(ChannelType.EMAIL, only_if_active=False),
|
|
]
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class UserEvent:
|
|
user_id: str = ""
|
|
event_id: str = ""
|
|
timestamp: int = 0 # Unix millis
|
|
event_type: EventType = EventType.STRATEGY_LOG
|
|
payload: bytes = b"" # JSON or nested protobuf
|
|
delivery: DeliverySpec = field(default_factory=DeliverySpec)
|
|
|
|
def serialize(self) -> bytes:
|
|
"""Serialize to protobuf wire format."""
|
|
# TODO: Replace with actual protobuf serialization
|
|
# For now, use a simple format for illustration
|
|
import json
|
|
data = {
|
|
"user_id": self.user_id,
|
|
"event_id": self.event_id,
|
|
"timestamp": self.timestamp,
|
|
"event_type": self.event_type,
|
|
"payload": self.payload.decode() if self.payload else "",
|
|
"delivery": {
|
|
"priority": self.delivery.priority,
|
|
"channels": [
|
|
{"channel": c.channel, "only_if_active": c.only_if_active}
|
|
for c in self.delivery.channels
|
|
]
|
|
}
|
|
}
|
|
return bytes([MSG_TYPE_USER_EVENT]) + json.dumps(data).encode()
|
|
|
|
@classmethod
|
|
def deserialize(cls, data: bytes) -> "UserEvent":
|
|
"""Deserialize from protobuf wire format."""
|
|
import json
|
|
msg_type = data[0]
|
|
assert msg_type == MSG_TYPE_USER_EVENT
|
|
obj = json.loads(data[1:])
|
|
return cls(
|
|
user_id=obj["user_id"],
|
|
event_id=obj["event_id"],
|
|
timestamp=obj["timestamp"],
|
|
event_type=EventType(obj["event_type"]),
|
|
payload=obj.get("payload", "").encode(),
|
|
delivery=DeliverySpec(
|
|
priority=Priority(obj["delivery"]["priority"]),
|
|
channels=[
|
|
ChannelPreference(ChannelType(c["channel"]), c["only_if_active"])
|
|
for c in obj["delivery"]["channels"]
|
|
]
|
|
)
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class EventAck:
|
|
event_id: str
|
|
status: AckStatus
|
|
error_message: str = ""
|
|
|
|
def serialize(self) -> bytes:
|
|
import json
|
|
data = {
|
|
"event_id": self.event_id,
|
|
"status": self.status,
|
|
"error_message": self.error_message,
|
|
}
|
|
return bytes([MSG_TYPE_EVENT_ACK]) + json.dumps(data).encode()
|
|
|
|
@classmethod
|
|
def deserialize(cls, data: bytes) -> "EventAck":
|
|
import json
|
|
msg_type = data[0]
|
|
assert msg_type == MSG_TYPE_EVENT_ACK
|
|
obj = json.loads(data[1:])
|
|
return cls(
|
|
event_id=obj["event_id"],
|
|
status=AckStatus(obj["status"]),
|
|
error_message=obj.get("error_message", ""),
|
|
)
|
|
```
|
|
|
|
### Pending Event Persistence
|
|
|
|
```python
|
|
# client-py/dexorder/events/pending_store.py
|
|
|
|
import json
|
|
import aiofiles
|
|
from pathlib import Path
|
|
from .types import UserEvent
|
|
|
|
|
|
class PendingStore:
|
|
"""
|
|
Persists pending critical events to disk for crash recovery.
|
|
"""
|
|
|
|
def __init__(self, path: str):
|
|
self.path = Path(path)
|
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
async def save_pending(self, events: list[UserEvent]):
|
|
"""Save pending events to disk."""
|
|
data = [
|
|
{
|
|
"user_id": e.user_id,
|
|
"event_id": e.event_id,
|
|
"timestamp": e.timestamp,
|
|
"event_type": e.event_type,
|
|
"payload": e.payload.decode() if e.payload else "",
|
|
"delivery": {
|
|
"priority": e.delivery.priority,
|
|
"channels": [
|
|
{"channel": c.channel, "only_if_active": c.only_if_active}
|
|
for c in e.delivery.channels
|
|
]
|
|
}
|
|
}
|
|
for e in events
|
|
]
|
|
async with aiofiles.open(self.path, "w") as f:
|
|
await f.write(json.dumps(data))
|
|
|
|
async def load_pending(self) -> list[UserEvent]:
|
|
"""Load pending events from disk."""
|
|
if not self.path.exists():
|
|
return []
|
|
|
|
try:
|
|
async with aiofiles.open(self.path, "r") as f:
|
|
content = await f.read()
|
|
data = json.loads(content)
|
|
|
|
events = []
|
|
for obj in data:
|
|
from .types import DeliverySpec, ChannelPreference, Priority, ChannelType, EventType
|
|
events.append(UserEvent(
|
|
user_id=obj["user_id"],
|
|
event_id=obj["event_id"],
|
|
timestamp=obj["timestamp"],
|
|
event_type=EventType(obj["event_type"]),
|
|
payload=obj.get("payload", "").encode(),
|
|
delivery=DeliverySpec(
|
|
priority=Priority(obj["delivery"]["priority"]),
|
|
channels=[
|
|
ChannelPreference(ChannelType(c["channel"]), c["only_if_active"])
|
|
for c in obj["delivery"]["channels"]
|
|
]
|
|
)
|
|
))
|
|
|
|
# Clear the file after loading
|
|
self.path.unlink()
|
|
return events
|
|
|
|
except Exception:
|
|
return []
|
|
```
|
|
|
|
### Integration with Strategy Engine
|
|
|
|
```python
|
|
# Example usage in strategy engine
|
|
|
|
from dexorder.events import EventPublisher, UserEvent, EventType, DeliverySpec
|
|
|
|
class StrategyEngine:
|
|
def __init__(self, user_id: str):
|
|
self.user_id = user_id
|
|
self.event_publisher = EventPublisher(
|
|
user_id=user_id,
|
|
pending_store_path=f"/data/users/{user_id}/pending_events.json"
|
|
)
|
|
|
|
async def start(self):
|
|
await self.event_publisher.start()
|
|
|
|
async def stop(self):
|
|
await self.event_publisher.stop()
|
|
|
|
async def on_order_filled(self, order):
|
|
"""Called when an order is filled - CRITICAL delivery."""
|
|
event = UserEvent(
|
|
event_type=EventType.ORDER_FILLED,
|
|
payload=json.dumps({
|
|
"order_id": order.id,
|
|
"symbol": order.symbol,
|
|
"side": order.side,
|
|
"price": str(order.fill_price),
|
|
"quantity": str(order.quantity),
|
|
}).encode(),
|
|
delivery=DeliverySpec.critical(),
|
|
)
|
|
await self.event_publisher.publish(event)
|
|
|
|
async def on_indicator_update(self, indicator_name: str, value: float):
|
|
"""Called on indicator update - INFORMATIONAL (only if watching)."""
|
|
event = UserEvent(
|
|
event_type=EventType.STRATEGY_LOG,
|
|
payload=json.dumps({
|
|
"indicator": indicator_name,
|
|
"value": value,
|
|
}).encode(),
|
|
delivery=DeliverySpec.informational(),
|
|
)
|
|
await self.event_publisher.publish(event)
|
|
|
|
async def on_alert_triggered(self, alert):
|
|
"""Called when price alert triggers - active session or Telegram."""
|
|
event = UserEvent(
|
|
event_type=EventType.ALERT_TRIGGERED,
|
|
payload=json.dumps({
|
|
"alert_id": alert.id,
|
|
"condition": alert.condition,
|
|
"triggered_price": str(alert.triggered_price),
|
|
}).encode(),
|
|
delivery=DeliverySpec.active_or_telegram(),
|
|
)
|
|
await self.event_publisher.publish(event)
|
|
```
|
|
|
|
## Gateway Implementation (TypeScript)
|
|
|
|
### File Structure
|
|
|
|
```
|
|
gateway/src/
|
|
├── events/
|
|
│ ├── index.ts
|
|
│ ├── event-subscriber.ts # SUB socket for informational events
|
|
│ ├── event-router.ts # ROUTER socket for critical events
|
|
│ ├── delivery-service.ts # Telegram, email, push delivery
|
|
│ └── types.ts
|
|
```
|
|
|
|
### Event Subscriber (Informational Events)
|
|
|
|
```typescript
|
|
// gateway/src/events/event-subscriber.ts
|
|
|
|
import * as zmq from 'zeromq';
|
|
import type { FastifyBaseLogger } from 'fastify';
|
|
import type { UserEvent } from './types.js';
|
|
import { deserializeUserEvent } from './types.js';
|
|
|
|
interface SessionRegistry {
|
|
get(userId: string): WebSocket | undefined;
|
|
has(userId: string): boolean;
|
|
}
|
|
|
|
/**
|
|
* Subscribes to user container XPUB sockets for informational events.
|
|
* One SUB socket connects to multiple containers (all containers for active sessions).
|
|
*/
|
|
export class EventSubscriber {
|
|
private socket: zmq.Subscriber;
|
|
private sessions: SessionRegistry;
|
|
private logger: FastifyBaseLogger;
|
|
private containerEndpoints: Map<string, string> = new Map(); // userId -> endpoint
|
|
private running = false;
|
|
|
|
constructor(sessions: SessionRegistry, logger: FastifyBaseLogger) {
|
|
this.socket = new zmq.Subscriber();
|
|
this.sessions = sessions;
|
|
this.logger = logger;
|
|
}
|
|
|
|
async start(): Promise<void> {
|
|
this.running = true;
|
|
this.processMessages();
|
|
}
|
|
|
|
async stop(): Promise<void> {
|
|
this.running = false;
|
|
this.socket.close();
|
|
}
|
|
|
|
/**
|
|
* Called when a user session connects.
|
|
* Connects to the user's container and subscribes to their events.
|
|
*/
|
|
async onSessionConnect(userId: string, containerEndpoint: string): Promise<void> {
|
|
const topic = `USER:${userId}`;
|
|
|
|
// Connect to container's XPUB socket if not already connected
|
|
if (!this.containerEndpoints.has(userId)) {
|
|
this.socket.connect(containerEndpoint);
|
|
this.containerEndpoints.set(userId, containerEndpoint);
|
|
this.logger.info({ userId, containerEndpoint }, 'Connected to container XPUB');
|
|
}
|
|
|
|
// Subscribe to user's topic
|
|
this.socket.subscribe(topic);
|
|
this.logger.info({ userId, topic }, 'Subscribed to user events');
|
|
}
|
|
|
|
/**
|
|
* Called when a user session disconnects.
|
|
* Unsubscribes from their events (but keeps connection for potential reconnect).
|
|
*/
|
|
async onSessionDisconnect(userId: string): Promise<void> {
|
|
const topic = `USER:${userId}`;
|
|
this.socket.unsubscribe(topic);
|
|
this.logger.info({ userId, topic }, 'Unsubscribed from user events');
|
|
|
|
// Optionally disconnect from container after timeout
|
|
// (in case user reconnects quickly)
|
|
}
|
|
|
|
private async processMessages(): Promise<void> {
|
|
for await (const [topicBuf, payloadBuf] of this.socket) {
|
|
if (!this.running) break;
|
|
|
|
try {
|
|
const topic = topicBuf.toString();
|
|
const userId = topic.replace('USER:', '');
|
|
const event = deserializeUserEvent(payloadBuf);
|
|
|
|
// Forward to active WebSocket session
|
|
const ws = this.sessions.get(userId);
|
|
if (ws) {
|
|
ws.send(JSON.stringify({
|
|
type: 'event',
|
|
eventType: event.eventType,
|
|
eventId: event.eventId,
|
|
timestamp: event.timestamp,
|
|
payload: JSON.parse(event.payload.toString()),
|
|
}));
|
|
}
|
|
} catch (error) {
|
|
this.logger.error({ error }, 'Error processing informational event');
|
|
}
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### Event Router (Critical Events)
|
|
|
|
```typescript
|
|
// gateway/src/events/event-router.ts
|
|
|
|
import * as zmq from 'zeromq';
|
|
import type { FastifyBaseLogger } from 'fastify';
|
|
import type { UserEvent, EventAck, AckStatus } from './types.js';
|
|
import { deserializeUserEvent, serializeEventAck } from './types.js';
|
|
import type { DeliveryService } from './delivery-service.js';
|
|
|
|
interface SessionRegistry {
|
|
get(userId: string): WebSocket | undefined;
|
|
has(userId: string): boolean;
|
|
}
|
|
|
|
/**
|
|
* ROUTER socket that receives critical events from all containers.
|
|
* Delivers events and sends acks back to containers.
|
|
*/
|
|
export class EventRouter {
|
|
private socket: zmq.Router;
|
|
private sessions: SessionRegistry;
|
|
private delivery: DeliveryService;
|
|
private logger: FastifyBaseLogger;
|
|
private running = false;
|
|
|
|
// Deduplication: track recently processed event IDs
|
|
private processedEvents: Map<string, number> = new Map();
|
|
private readonly DEDUP_TTL_MS = 5 * 60 * 1000; // 5 minutes
|
|
|
|
constructor(
|
|
sessions: SessionRegistry,
|
|
delivery: DeliveryService,
|
|
logger: FastifyBaseLogger,
|
|
bindEndpoint: string = 'tcp://*:5571'
|
|
) {
|
|
this.socket = new zmq.Router();
|
|
this.sessions = sessions;
|
|
this.delivery = delivery;
|
|
this.logger = logger;
|
|
|
|
this.socket.bind(bindEndpoint);
|
|
this.logger.info({ bindEndpoint }, 'Event router bound');
|
|
}
|
|
|
|
async start(): Promise<void> {
|
|
this.running = true;
|
|
this.processMessages();
|
|
this.cleanupDedup();
|
|
}
|
|
|
|
async stop(): Promise<void> {
|
|
this.running = false;
|
|
this.socket.close();
|
|
}
|
|
|
|
private async processMessages(): Promise<void> {
|
|
for await (const [identity, payload] of this.socket) {
|
|
if (!this.running) break;
|
|
|
|
try {
|
|
const event = deserializeUserEvent(payload);
|
|
|
|
// Deduplication check
|
|
if (this.processedEvents.has(event.eventId)) {
|
|
// Already processed - send success ack
|
|
await this.sendAck(identity, event.eventId, 'DELIVERED');
|
|
continue;
|
|
}
|
|
|
|
this.logger.info(
|
|
{ eventId: event.eventId, userId: event.userId, eventType: event.eventType },
|
|
'Processing critical event'
|
|
);
|
|
|
|
// Deliver through channel preferences
|
|
const status = await this.deliverEvent(event);
|
|
|
|
// Mark as processed
|
|
this.processedEvents.set(event.eventId, Date.now());
|
|
|
|
// Send ack back to container
|
|
await this.sendAck(identity, event.eventId, status);
|
|
|
|
} catch (error) {
|
|
this.logger.error({ error }, 'Error processing critical event');
|
|
// Don't ack - let container retry
|
|
}
|
|
}
|
|
}
|
|
|
|
private async deliverEvent(event: UserEvent): Promise<AckStatus> {
|
|
for (const pref of event.delivery.channels) {
|
|
// Skip if channel requires active session but none exists
|
|
if (pref.onlyIfActive && !this.sessions.has(event.userId)) {
|
|
continue;
|
|
}
|
|
|
|
try {
|
|
switch (pref.channel) {
|
|
case 'ACTIVE_SESSION': {
|
|
const ws = this.sessions.get(event.userId);
|
|
if (ws) {
|
|
ws.send(JSON.stringify({
|
|
type: 'event',
|
|
eventType: event.eventType,
|
|
eventId: event.eventId,
|
|
timestamp: event.timestamp,
|
|
payload: JSON.parse(event.payload.toString()),
|
|
}));
|
|
return 'DELIVERED';
|
|
}
|
|
break;
|
|
}
|
|
|
|
case 'TELEGRAM':
|
|
await this.delivery.sendTelegram(event.userId, event);
|
|
return 'DELIVERED';
|
|
|
|
case 'EMAIL':
|
|
await this.delivery.sendEmail(event.userId, event);
|
|
return 'DELIVERED';
|
|
|
|
case 'PUSH':
|
|
await this.delivery.sendPush(event.userId, event);
|
|
return 'DELIVERED';
|
|
}
|
|
} catch (error) {
|
|
this.logger.warn(
|
|
{ error, channel: pref.channel, eventId: event.eventId },
|
|
'Channel delivery failed, trying next'
|
|
);
|
|
// Continue to next channel preference
|
|
}
|
|
}
|
|
|
|
// All channels failed
|
|
return 'ERROR';
|
|
}
|
|
|
|
private async sendAck(
|
|
identity: Buffer,
|
|
eventId: string,
|
|
status: AckStatus,
|
|
errorMessage?: string
|
|
): Promise<void> {
|
|
const ack: EventAck = { eventId, status, errorMessage: errorMessage || '' };
|
|
await this.socket.send([identity, serializeEventAck(ack)]);
|
|
}
|
|
|
|
private cleanupDedup(): void {
|
|
setInterval(() => {
|
|
const now = Date.now();
|
|
for (const [eventId, timestamp] of this.processedEvents) {
|
|
if (now - timestamp > this.DEDUP_TTL_MS) {
|
|
this.processedEvents.delete(eventId);
|
|
}
|
|
}
|
|
}, 60000); // Cleanup every minute
|
|
}
|
|
}
|
|
```
|
|
|
|
### Delivery Service
|
|
|
|
```typescript
|
|
// gateway/src/events/delivery-service.ts
|
|
|
|
import type { FastifyBaseLogger } from 'fastify';
|
|
import type { UserEvent } from './types.js';
|
|
|
|
interface UserChannelConfig {
|
|
telegramChatId?: string;
|
|
email?: string;
|
|
pushToken?: string;
|
|
}
|
|
|
|
/**
|
|
* Handles actual delivery to external channels.
|
|
* Owns credentials for Telegram bot, email service, push notifications.
|
|
*/
|
|
export class DeliveryService {
|
|
private telegramBotToken: string;
|
|
private emailServiceKey: string;
|
|
private pushServiceKey: string;
|
|
private logger: FastifyBaseLogger;
|
|
|
|
// User channel configs (loaded from DB)
|
|
private userConfigs: Map<string, UserChannelConfig> = new Map();
|
|
|
|
constructor(config: {
|
|
telegramBotToken: string;
|
|
emailServiceKey: string;
|
|
pushServiceKey: string;
|
|
logger: FastifyBaseLogger;
|
|
}) {
|
|
this.telegramBotToken = config.telegramBotToken;
|
|
this.emailServiceKey = config.emailServiceKey;
|
|
this.pushServiceKey = config.pushServiceKey;
|
|
this.logger = config.logger;
|
|
}
|
|
|
|
async loadUserConfig(userId: string): Promise<UserChannelConfig> {
|
|
// TODO: Load from database
|
|
// For now, return cached or empty
|
|
return this.userConfigs.get(userId) || {};
|
|
}
|
|
|
|
async sendTelegram(userId: string, event: UserEvent): Promise<void> {
|
|
const config = await this.loadUserConfig(userId);
|
|
if (!config.telegramChatId) {
|
|
throw new Error('User has no Telegram chat ID configured');
|
|
}
|
|
|
|
const message = this.formatEventMessage(event);
|
|
|
|
const response = await fetch(
|
|
`https://api.telegram.org/bot${this.telegramBotToken}/sendMessage`,
|
|
{
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify({
|
|
chat_id: config.telegramChatId,
|
|
text: message,
|
|
parse_mode: 'Markdown',
|
|
}),
|
|
}
|
|
);
|
|
|
|
if (!response.ok) {
|
|
const error = await response.text();
|
|
throw new Error(`Telegram API error: ${error}`);
|
|
}
|
|
|
|
this.logger.info({ userId, eventId: event.eventId }, 'Sent Telegram notification');
|
|
}
|
|
|
|
async sendEmail(userId: string, event: UserEvent): Promise<void> {
|
|
const config = await this.loadUserConfig(userId);
|
|
if (!config.email) {
|
|
throw new Error('User has no email configured');
|
|
}
|
|
|
|
// TODO: Implement email sending via SendGrid/SES/etc.
|
|
this.logger.info({ userId, eventId: event.eventId }, 'Sent email notification');
|
|
}
|
|
|
|
async sendPush(userId: string, event: UserEvent): Promise<void> {
|
|
const config = await this.loadUserConfig(userId);
|
|
if (!config.pushToken) {
|
|
throw new Error('User has no push token configured');
|
|
}
|
|
|
|
// TODO: Implement push notification via Firebase/APNs
|
|
this.logger.info({ userId, eventId: event.eventId }, 'Sent push notification');
|
|
}
|
|
|
|
private formatEventMessage(event: UserEvent): string {
|
|
const payload = JSON.parse(event.payload.toString());
|
|
|
|
switch (event.eventType) {
|
|
case 'ORDER_FILLED':
|
|
return `🟢 *Order Filled*\n` +
|
|
`Symbol: ${payload.symbol}\n` +
|
|
`Side: ${payload.side}\n` +
|
|
`Price: ${payload.price}\n` +
|
|
`Quantity: ${payload.quantity}`;
|
|
|
|
case 'ALERT_TRIGGERED':
|
|
return `🔔 *Alert Triggered*\n` +
|
|
`Condition: ${payload.condition}\n` +
|
|
`Price: ${payload.triggered_price}`;
|
|
|
|
case 'POSITION_UPDATED':
|
|
return `📊 *Position Updated*\n` +
|
|
`Symbol: ${payload.symbol}\n` +
|
|
`Size: ${payload.size}\n` +
|
|
`PnL: ${payload.unrealized_pnl}`;
|
|
|
|
default:
|
|
return `📌 *Notification*\n${JSON.stringify(payload, null, 2)}`;
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### Integration in Gateway Main
|
|
|
|
```typescript
|
|
// gateway/src/main.ts (additions)
|
|
|
|
import { EventSubscriber } from './events/event-subscriber.js';
|
|
import { EventRouter } from './events/event-router.js';
|
|
import { DeliveryService } from './events/delivery-service.js';
|
|
|
|
// In main setup:
|
|
|
|
const sessionRegistry = new Map<string, WebSocket>();
|
|
|
|
const deliveryService = new DeliveryService({
|
|
telegramBotToken: process.env.TELEGRAM_BOT_TOKEN!,
|
|
emailServiceKey: process.env.EMAIL_SERVICE_KEY!,
|
|
pushServiceKey: process.env.PUSH_SERVICE_KEY!,
|
|
logger: app.log,
|
|
});
|
|
|
|
const eventSubscriber = new EventSubscriber(sessionRegistry, app.log);
|
|
const eventRouter = new EventRouter(sessionRegistry, deliveryService, app.log);
|
|
|
|
await eventSubscriber.start();
|
|
await eventRouter.start();
|
|
|
|
// In WebSocket handler:
|
|
|
|
websocket.on('connection', async (ws, req) => {
|
|
const authContext = await authenticate(req);
|
|
const userId = authContext.userId;
|
|
const containerEndpoint = authContext.license.containerEventEndpoint;
|
|
|
|
// Register session
|
|
sessionRegistry.set(userId, ws);
|
|
|
|
// Subscribe to informational events
|
|
await eventSubscriber.onSessionConnect(userId, containerEndpoint);
|
|
|
|
ws.on('close', async () => {
|
|
sessionRegistry.delete(userId);
|
|
await eventSubscriber.onSessionDisconnect(userId);
|
|
});
|
|
});
|
|
```
|
|
|
|
## Protobuf Definitions
|
|
|
|
Add to `proto/user_events.proto`:
|
|
|
|
```protobuf
|
|
syntax = "proto3";
|
|
|
|
package dexorder.events;
|
|
|
|
message UserEvent {
|
|
string user_id = 1;
|
|
string event_id = 2;
|
|
int64 timestamp = 3;
|
|
EventType event_type = 4;
|
|
bytes payload = 5;
|
|
DeliverySpec delivery = 6;
|
|
}
|
|
|
|
enum EventType {
|
|
ORDER_PLACED = 0;
|
|
ORDER_FILLED = 1;
|
|
ORDER_CANCELLED = 2;
|
|
ALERT_TRIGGERED = 3;
|
|
POSITION_UPDATED = 4;
|
|
WORKSPACE_CHANGED = 5;
|
|
STRATEGY_LOG = 6;
|
|
}
|
|
|
|
message DeliverySpec {
|
|
Priority priority = 1;
|
|
repeated ChannelPreference channels = 2;
|
|
}
|
|
|
|
enum Priority {
|
|
INFORMATIONAL = 0;
|
|
NORMAL = 1;
|
|
CRITICAL = 2;
|
|
}
|
|
|
|
message ChannelPreference {
|
|
ChannelType channel = 1;
|
|
bool only_if_active = 2;
|
|
}
|
|
|
|
enum ChannelType {
|
|
ACTIVE_SESSION = 0;
|
|
WEB = 1;
|
|
TELEGRAM = 2;
|
|
EMAIL = 3;
|
|
PUSH = 4;
|
|
}
|
|
|
|
message EventAck {
|
|
string event_id = 1;
|
|
AckStatus status = 2;
|
|
string error_message = 3;
|
|
}
|
|
|
|
enum AckStatus {
|
|
DELIVERED = 0;
|
|
QUEUED = 1;
|
|
ERROR = 2;
|
|
}
|
|
```
|
|
|
|
## Kubernetes Configuration
|
|
|
|
### Container Ports
|
|
|
|
Add to agent deployment template:
|
|
|
|
```yaml
|
|
# deploy/k8s/templates/agent-deployment.yaml
|
|
|
|
spec:
|
|
containers:
|
|
- name: agent
|
|
ports:
|
|
- containerPort: 3000 # MCP server
|
|
name: mcp
|
|
- containerPort: 5570 # XPUB for informational events
|
|
name: events-xpub
|
|
```
|
|
|
|
### Gateway Configuration
|
|
|
|
```yaml
|
|
# deploy/k8s/base/gateway.yaml
|
|
|
|
spec:
|
|
containers:
|
|
- name: gateway
|
|
ports:
|
|
- containerPort: 3000 # HTTP/WebSocket
|
|
name: http
|
|
- containerPort: 5571 # ROUTER for critical events
|
|
name: events-router
|
|
env:
|
|
- name: TELEGRAM_BOT_TOKEN
|
|
valueFrom:
|
|
secretKeyRef:
|
|
name: gateway-secrets
|
|
key: telegram-bot-token
|
|
- name: EVENT_ROUTER_BIND
|
|
value: "tcp://*:5571"
|
|
```
|
|
|
|
### Network Policy
|
|
|
|
Allow containers to connect to gateway ROUTER:
|
|
|
|
```yaml
|
|
# deploy/k8s/base/network-policies.yaml (additions)
|
|
|
|
apiVersion: networking.k8s.io/v1
|
|
kind: NetworkPolicy
|
|
metadata:
|
|
name: agent-to-gateway-events
|
|
namespace: dexorder-agents
|
|
spec:
|
|
podSelector:
|
|
matchLabels:
|
|
app: agent
|
|
policyTypes:
|
|
- Egress
|
|
egress:
|
|
- to:
|
|
- namespaceSelector:
|
|
matchLabels:
|
|
name: dexorder-system
|
|
podSelector:
|
|
matchLabels:
|
|
app: gateway
|
|
ports:
|
|
- protocol: TCP
|
|
port: 5571 # Critical events ROUTER
|
|
```
|
|
|
|
## Testing
|
|
|
|
### Unit Tests
|
|
|
|
```python
|
|
# tests/test_event_publisher.py
|
|
|
|
import pytest
|
|
import asyncio
|
|
from dexorder.events import EventPublisher, UserEvent, DeliverySpec, EventType
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_informational_dropped_without_subscriber():
|
|
publisher = EventPublisher(user_id="test-user", xpub_port=15570)
|
|
await publisher.start()
|
|
|
|
# No subscriber connected
|
|
assert not publisher.has_active_subscriber()
|
|
|
|
# Publish informational event - should be silently dropped
|
|
event = UserEvent(
|
|
event_type=EventType.STRATEGY_LOG,
|
|
payload=b'{"test": true}',
|
|
delivery=DeliverySpec.informational(),
|
|
)
|
|
await publisher.publish(event)
|
|
|
|
# No pending events (was dropped, not queued)
|
|
assert len(publisher.pending_events) == 0
|
|
|
|
await publisher.stop()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_critical_queued_without_subscriber():
|
|
publisher = EventPublisher(user_id="test-user", xpub_port=15571)
|
|
await publisher.start()
|
|
|
|
# No subscriber connected
|
|
assert not publisher.has_active_subscriber()
|
|
|
|
# Publish critical event - should go to DEALER
|
|
event = UserEvent(
|
|
event_type=EventType.ORDER_FILLED,
|
|
payload=b'{"order_id": "123"}',
|
|
delivery=DeliverySpec.critical(),
|
|
)
|
|
await publisher.publish(event)
|
|
|
|
# Should be in pending queue
|
|
assert len(publisher.pending_events) == 1
|
|
|
|
await publisher.stop()
|
|
```
|
|
|
|
### Integration Test
|
|
|
|
```bash
|
|
# Start a mock gateway ROUTER
|
|
python -c "
|
|
import zmq
|
|
ctx = zmq.Context()
|
|
router = ctx.socket(zmq.ROUTER)
|
|
router.bind('tcp://*:5571')
|
|
print('Waiting for events...')
|
|
while True:
|
|
identity, payload = router.recv_multipart()
|
|
print(f'Received from {identity}: {payload}')
|
|
# Send ack
|
|
router.send_multipart([identity, b'\x21{\"event_id\":\"test\",\"status\":0}'])
|
|
"
|
|
|
|
# In another terminal, run container with test events
|
|
python -c "
|
|
import asyncio
|
|
from dexorder.events import EventPublisher, UserEvent, DeliverySpec, EventType
|
|
|
|
async def main():
|
|
pub = EventPublisher('test-user', gateway_router_endpoint='tcp://localhost:5571')
|
|
await pub.start()
|
|
|
|
event = UserEvent(
|
|
event_type=EventType.ORDER_FILLED,
|
|
payload=b'{\"test\": true}',
|
|
delivery=DeliverySpec.critical(),
|
|
)
|
|
await pub.publish(event)
|
|
|
|
await asyncio.sleep(5) # Wait for ack
|
|
print(f'Pending events: {len(pub.pending_events)}')
|
|
|
|
await pub.stop()
|
|
|
|
asyncio.run(main())
|
|
"
|
|
```
|
|
|
|
## Migration Plan
|
|
|
|
1. **Phase 1**: Add protobuf definitions, build Python/TypeScript types
|
|
2. **Phase 2**: Implement EventPublisher in container (inactive by default)
|
|
3. **Phase 3**: Implement EventSubscriber/EventRouter in gateway
|
|
4. **Phase 4**: Add DeliveryService with Telegram integration
|
|
5. **Phase 5**: Enable in container, test with single user
|
|
6. **Phase 6**: Roll out to all users, add email/push support
|
|
|
|
## Monitoring
|
|
|
|
Key metrics to track:
|
|
- `events_published_total{priority, channel}` - Events published by container
|
|
- `events_delivered_total{channel, status}` - Events delivered by gateway
|
|
- `events_pending_count` - Current pending critical events per container
|
|
- `event_delivery_latency_ms{channel}` - Time from publish to delivery
|
|
- `event_retry_total` - Retries due to ack timeout
|