Files
ai/doc/user_container_events.md

1305 lines
40 KiB
Markdown

# User Container Event System - Implementation Guide
This document provides detailed implementation guidance for the dual-channel event system between user containers and the gateway. See `doc/protocol.md` for the protocol specification.
## Overview
User containers emit events that must reach users through various channels:
- **Active session** (WebSocket, Telegram webhook)
- **External delivery** (Telegram API, email, push notifications)
Two ZMQ patterns handle different delivery requirements:
1. **XPUB/SUB**: Fast path for active sessions (fire-and-forget)
2. **DEALER/ROUTER**: Guaranteed delivery for critical events (with ack)
## Architecture Decision: Why Two Channels?
**Problem**: A single pattern can't satisfy both requirements:
- PUB/SUB drops messages if no subscriber (unacceptable for order confirmations)
- DEALER/ROUTER adds latency and requires ack handling (overkill for chart updates)
**Solution**: Container decides routing based on:
1. Event priority (INFORMATIONAL vs CRITICAL)
2. Whether an active session exists (tracked via XPUB subscriptions)
```
┌─────────────────────────────────────────────────────────────────┐
│ Decision Tree │
│ │
│ Event arrives │
│ │ │
│ ▼ │
│ ┌───────────────────┐ │
│ │ Priority == │ │
│ │ INFORMATIONAL? │ │
│ └─────────┬─────────┘ │
│ Yes │ No │
│ │ │ │
│ │ ▼ │
│ │ ┌───────────────────┐ │
│ │ │ has_active_ │ │
│ │ │ subscriber()? │ │
│ │ └─────────┬─────────┘ │
│ │ Yes │ No │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ XPUB │ │ DEALER │ │
│ │ (fast path) │ │ (guaranteed)│ │
│ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
## Container Implementation (Python)
### File Structure
```
client-py/dexorder/
├── events/
│ ├── __init__.py
│ ├── publisher.py # EventPublisher class
│ ├── types.py # UserEvent, DeliverySpec, etc.
│ └── pending_store.py # Disk persistence for critical events
```
### Event Publisher Class
```python
# client-py/dexorder/events/publisher.py
import asyncio
import time
import uuid
from dataclasses import dataclass, field
from typing import Optional
import zmq
import zmq.asyncio
from .types import UserEvent, Priority, EventAck, AckStatus
from .pending_store import PendingStore
@dataclass
class PendingEvent:
event: UserEvent
sent_at: float
retries: int = 0
class EventPublisher:
"""
Publishes user events via dual ZMQ channels:
- XPUB for informational/active-session events
- DEALER for critical/guaranteed-delivery events
"""
def __init__(
self,
user_id: str,
xpub_port: int = 5570,
gateway_router_endpoint: str = "tcp://gateway:5571",
ack_timeout: float = 30.0,
max_retries: int = 3,
pending_store_path: Optional[str] = None,
):
self.user_id = user_id
self.xpub_port = xpub_port
self.gateway_router_endpoint = gateway_router_endpoint
self.ack_timeout = ack_timeout
self.max_retries = max_retries
self.ctx = zmq.asyncio.Context()
# XPUB socket for informational events
self.xpub_socket = self.ctx.socket(zmq.XPUB)
self.xpub_socket.setsockopt(zmq.XPUB_VERBOSE, 1)
self.xpub_socket.bind(f"tcp://*:{xpub_port}")
# DEALER socket for critical events
self.dealer_socket = self.ctx.socket(zmq.DEALER)
self.dealer_socket.setsockopt_string(zmq.IDENTITY, f"container-{user_id}")
self.dealer_socket.connect(gateway_router_endpoint)
# Track active subscriptions (gateway sessions)
self.active_subscriptions: set[str] = set()
# Track pending critical events awaiting ack
self.pending_events: dict[str, PendingEvent] = {}
# Persistent store for crash recovery
self.pending_store = PendingStore(pending_store_path) if pending_store_path else None
# Background tasks
self._subscription_task: Optional[asyncio.Task] = None
self._ack_task: Optional[asyncio.Task] = None
self._retry_task: Optional[asyncio.Task] = None
self._running = False
async def start(self):
"""Start background tasks for subscription tracking and ack handling."""
self._running = True
# Reload any persisted pending events
if self.pending_store:
for event in await self.pending_store.load_pending():
self.pending_events[event.event_id] = PendingEvent(event, time.time())
self._subscription_task = asyncio.create_task(self._subscription_loop())
self._ack_task = asyncio.create_task(self._ack_loop())
self._retry_task = asyncio.create_task(self._retry_loop())
async def stop(self):
"""Stop background tasks and persist pending events."""
self._running = False
# Cancel background tasks
for task in [self._subscription_task, self._ack_task, self._retry_task]:
if task:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Persist pending critical events
if self.pending_store and self.pending_events:
await self.pending_store.save_pending(
[pe.event for pe in self.pending_events.values()]
)
# Close sockets
self.xpub_socket.close()
self.dealer_socket.close()
self.ctx.term()
def has_active_subscriber(self) -> bool:
"""Check if any gateway is subscribed to this user's events."""
topic = f"USER:{self.user_id}"
return topic in self.active_subscriptions
async def publish(self, event: UserEvent):
"""
Publish event via appropriate channel based on priority and session state.
"""
# Ensure event has required fields
if not event.event_id:
event.event_id = str(uuid.uuid4())
if not event.user_id:
event.user_id = self.user_id
if not event.timestamp:
event.timestamp = int(time.time() * 1000)
topic = f"USER:{self.user_id}"
priority = event.delivery.priority if event.delivery else Priority.NORMAL
if priority == Priority.INFORMATIONAL:
# Fire and forget - only send if someone's listening
if self.has_active_subscriber():
await self._send_xpub(topic, event)
# else: silently drop (by design)
elif self.has_active_subscriber():
# Active session exists - use fast path
await self._send_xpub(topic, event)
else:
# No active session - use guaranteed delivery
await self._send_dealer(event)
async def _send_xpub(self, topic: str, event: UserEvent):
"""Send event via XPUB (fire-and-forget)."""
payload = event.serialize() # Protobuf serialization
await self.xpub_socket.send_multipart([topic.encode(), payload])
async def _send_dealer(self, event: UserEvent):
"""Send event via DEALER (with ack tracking)."""
self.pending_events[event.event_id] = PendingEvent(
event=event,
sent_at=time.time(),
retries=0,
)
payload = event.serialize()
await self.dealer_socket.send(payload)
async def _subscription_loop(self):
"""Process XPUB subscription/unsubscription messages."""
while self._running:
try:
if await self.xpub_socket.poll(100):
msg = await self.xpub_socket.recv()
# First byte: 1 = subscribe, 0 = unsubscribe
# Remaining bytes: topic
is_subscribe = msg[0] == 1
topic = msg[1:].decode()
if is_subscribe:
self.active_subscriptions.add(topic)
else:
self.active_subscriptions.discard(topic)
except asyncio.CancelledError:
break
except Exception as e:
# Log error but continue
print(f"Subscription loop error: {e}")
async def _ack_loop(self):
"""Process EventAck messages from gateway."""
while self._running:
try:
if await self.dealer_socket.poll(100):
payload = await self.dealer_socket.recv()
ack = EventAck.deserialize(payload)
if ack.event_id in self.pending_events:
pending = self.pending_events.pop(ack.event_id)
if ack.status == AckStatus.ERROR:
# Gateway couldn't deliver - log for now
# Could implement escalation logic here
print(f"Event {ack.event_id} delivery failed: {ack.error_message}")
# else: DELIVERED or QUEUED - we're done
except asyncio.CancelledError:
break
except Exception as e:
print(f"Ack loop error: {e}")
async def _retry_loop(self):
"""Retry pending events that haven't been acked."""
while self._running:
try:
await asyncio.sleep(5) # Check every 5 seconds
now = time.time()
for event_id, pending in list(self.pending_events.items()):
if now - pending.sent_at > self.ack_timeout:
if pending.retries >= self.max_retries:
# Give up - log and remove
print(f"Event {event_id} exceeded max retries, dropping")
del self.pending_events[event_id]
else:
# Retry
pending.retries += 1
pending.sent_at = now
payload = pending.event.serialize()
await self.dealer_socket.send(payload)
except asyncio.CancelledError:
break
except Exception as e:
print(f"Retry loop error: {e}")
```
### Event Types
```python
# client-py/dexorder/events/types.py
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Optional
import struct
# Message type IDs (match protocol.md)
MSG_TYPE_USER_EVENT = 0x20
MSG_TYPE_EVENT_ACK = 0x21
class Priority(IntEnum):
INFORMATIONAL = 0 # Drop if no active session
NORMAL = 1 # Best effort
CRITICAL = 2 # Must deliver
class ChannelType(IntEnum):
ACTIVE_SESSION = 0
WEB = 1
TELEGRAM = 2
EMAIL = 3
PUSH = 4
class EventType(IntEnum):
ORDER_PLACED = 0
ORDER_FILLED = 1
ORDER_CANCELLED = 2
ALERT_TRIGGERED = 3
POSITION_UPDATED = 4
WORKSPACE_CHANGED = 5
STRATEGY_LOG = 6
class AckStatus(IntEnum):
DELIVERED = 0
QUEUED = 1
ERROR = 2
@dataclass
class ChannelPreference:
channel: ChannelType
only_if_active: bool = False
@dataclass
class DeliverySpec:
priority: Priority = Priority.NORMAL
channels: list[ChannelPreference] = field(default_factory=list)
@staticmethod
def informational() -> "DeliverySpec":
"""Drop if no active session."""
return DeliverySpec(
priority=Priority.INFORMATIONAL,
channels=[ChannelPreference(ChannelType.ACTIVE_SESSION, only_if_active=True)]
)
@staticmethod
def active_or_telegram() -> "DeliverySpec":
"""Active session preferred, fallback to Telegram."""
return DeliverySpec(
priority=Priority.NORMAL,
channels=[
ChannelPreference(ChannelType.ACTIVE_SESSION, only_if_active=True),
ChannelPreference(ChannelType.TELEGRAM, only_if_active=False),
]
)
@staticmethod
def critical() -> "DeliverySpec":
"""Must deliver through any available channel."""
return DeliverySpec(
priority=Priority.CRITICAL,
channels=[
ChannelPreference(ChannelType.ACTIVE_SESSION, only_if_active=True),
ChannelPreference(ChannelType.TELEGRAM, only_if_active=False),
ChannelPreference(ChannelType.PUSH, only_if_active=False),
ChannelPreference(ChannelType.EMAIL, only_if_active=False),
]
)
@dataclass
class UserEvent:
user_id: str = ""
event_id: str = ""
timestamp: int = 0 # Unix millis
event_type: EventType = EventType.STRATEGY_LOG
payload: bytes = b"" # JSON or nested protobuf
delivery: DeliverySpec = field(default_factory=DeliverySpec)
def serialize(self) -> bytes:
"""Serialize to protobuf wire format."""
# TODO: Replace with actual protobuf serialization
# For now, use a simple format for illustration
import json
data = {
"user_id": self.user_id,
"event_id": self.event_id,
"timestamp": self.timestamp,
"event_type": self.event_type,
"payload": self.payload.decode() if self.payload else "",
"delivery": {
"priority": self.delivery.priority,
"channels": [
{"channel": c.channel, "only_if_active": c.only_if_active}
for c in self.delivery.channels
]
}
}
return bytes([MSG_TYPE_USER_EVENT]) + json.dumps(data).encode()
@classmethod
def deserialize(cls, data: bytes) -> "UserEvent":
"""Deserialize from protobuf wire format."""
import json
msg_type = data[0]
assert msg_type == MSG_TYPE_USER_EVENT
obj = json.loads(data[1:])
return cls(
user_id=obj["user_id"],
event_id=obj["event_id"],
timestamp=obj["timestamp"],
event_type=EventType(obj["event_type"]),
payload=obj.get("payload", "").encode(),
delivery=DeliverySpec(
priority=Priority(obj["delivery"]["priority"]),
channels=[
ChannelPreference(ChannelType(c["channel"]), c["only_if_active"])
for c in obj["delivery"]["channels"]
]
)
)
@dataclass
class EventAck:
event_id: str
status: AckStatus
error_message: str = ""
def serialize(self) -> bytes:
import json
data = {
"event_id": self.event_id,
"status": self.status,
"error_message": self.error_message,
}
return bytes([MSG_TYPE_EVENT_ACK]) + json.dumps(data).encode()
@classmethod
def deserialize(cls, data: bytes) -> "EventAck":
import json
msg_type = data[0]
assert msg_type == MSG_TYPE_EVENT_ACK
obj = json.loads(data[1:])
return cls(
event_id=obj["event_id"],
status=AckStatus(obj["status"]),
error_message=obj.get("error_message", ""),
)
```
### Pending Event Persistence
```python
# client-py/dexorder/events/pending_store.py
import json
import aiofiles
from pathlib import Path
from .types import UserEvent
class PendingStore:
"""
Persists pending critical events to disk for crash recovery.
"""
def __init__(self, path: str):
self.path = Path(path)
self.path.parent.mkdir(parents=True, exist_ok=True)
async def save_pending(self, events: list[UserEvent]):
"""Save pending events to disk."""
data = [
{
"user_id": e.user_id,
"event_id": e.event_id,
"timestamp": e.timestamp,
"event_type": e.event_type,
"payload": e.payload.decode() if e.payload else "",
"delivery": {
"priority": e.delivery.priority,
"channels": [
{"channel": c.channel, "only_if_active": c.only_if_active}
for c in e.delivery.channels
]
}
}
for e in events
]
async with aiofiles.open(self.path, "w") as f:
await f.write(json.dumps(data))
async def load_pending(self) -> list[UserEvent]:
"""Load pending events from disk."""
if not self.path.exists():
return []
try:
async with aiofiles.open(self.path, "r") as f:
content = await f.read()
data = json.loads(content)
events = []
for obj in data:
from .types import DeliverySpec, ChannelPreference, Priority, ChannelType, EventType
events.append(UserEvent(
user_id=obj["user_id"],
event_id=obj["event_id"],
timestamp=obj["timestamp"],
event_type=EventType(obj["event_type"]),
payload=obj.get("payload", "").encode(),
delivery=DeliverySpec(
priority=Priority(obj["delivery"]["priority"]),
channels=[
ChannelPreference(ChannelType(c["channel"]), c["only_if_active"])
for c in obj["delivery"]["channels"]
]
)
))
# Clear the file after loading
self.path.unlink()
return events
except Exception:
return []
```
### Integration with Strategy Engine
```python
# Example usage in strategy engine
from dexorder.events import EventPublisher, UserEvent, EventType, DeliverySpec
class StrategyEngine:
def __init__(self, user_id: str):
self.user_id = user_id
self.event_publisher = EventPublisher(
user_id=user_id,
pending_store_path=f"/data/users/{user_id}/pending_events.json"
)
async def start(self):
await self.event_publisher.start()
async def stop(self):
await self.event_publisher.stop()
async def on_order_filled(self, order):
"""Called when an order is filled - CRITICAL delivery."""
event = UserEvent(
event_type=EventType.ORDER_FILLED,
payload=json.dumps({
"order_id": order.id,
"symbol": order.symbol,
"side": order.side,
"price": str(order.fill_price),
"quantity": str(order.quantity),
}).encode(),
delivery=DeliverySpec.critical(),
)
await self.event_publisher.publish(event)
async def on_indicator_update(self, indicator_name: str, value: float):
"""Called on indicator update - INFORMATIONAL (only if watching)."""
event = UserEvent(
event_type=EventType.STRATEGY_LOG,
payload=json.dumps({
"indicator": indicator_name,
"value": value,
}).encode(),
delivery=DeliverySpec.informational(),
)
await self.event_publisher.publish(event)
async def on_alert_triggered(self, alert):
"""Called when price alert triggers - active session or Telegram."""
event = UserEvent(
event_type=EventType.ALERT_TRIGGERED,
payload=json.dumps({
"alert_id": alert.id,
"condition": alert.condition,
"triggered_price": str(alert.triggered_price),
}).encode(),
delivery=DeliverySpec.active_or_telegram(),
)
await self.event_publisher.publish(event)
```
## Gateway Implementation (TypeScript)
### File Structure
```
gateway/src/
├── events/
│ ├── index.ts
│ ├── event-subscriber.ts # SUB socket for informational events
│ ├── event-router.ts # ROUTER socket for critical events
│ ├── delivery-service.ts # Telegram, email, push delivery
│ └── types.ts
```
### Event Subscriber (Informational Events)
```typescript
// gateway/src/events/event-subscriber.ts
import * as zmq from 'zeromq';
import type { FastifyBaseLogger } from 'fastify';
import type { UserEvent } from './types.js';
import { deserializeUserEvent } from './types.js';
interface SessionRegistry {
get(userId: string): WebSocket | undefined;
has(userId: string): boolean;
}
/**
* Subscribes to user container XPUB sockets for informational events.
* One SUB socket connects to multiple containers (all containers for active sessions).
*/
export class EventSubscriber {
private socket: zmq.Subscriber;
private sessions: SessionRegistry;
private logger: FastifyBaseLogger;
private containerEndpoints: Map<string, string> = new Map(); // userId -> endpoint
private running = false;
constructor(sessions: SessionRegistry, logger: FastifyBaseLogger) {
this.socket = new zmq.Subscriber();
this.sessions = sessions;
this.logger = logger;
}
async start(): Promise<void> {
this.running = true;
this.processMessages();
}
async stop(): Promise<void> {
this.running = false;
this.socket.close();
}
/**
* Called when a user session connects.
* Connects to the user's container and subscribes to their events.
*/
async onSessionConnect(userId: string, containerEndpoint: string): Promise<void> {
const topic = `USER:${userId}`;
// Connect to container's XPUB socket if not already connected
if (!this.containerEndpoints.has(userId)) {
this.socket.connect(containerEndpoint);
this.containerEndpoints.set(userId, containerEndpoint);
this.logger.info({ userId, containerEndpoint }, 'Connected to container XPUB');
}
// Subscribe to user's topic
this.socket.subscribe(topic);
this.logger.info({ userId, topic }, 'Subscribed to user events');
}
/**
* Called when a user session disconnects.
* Unsubscribes from their events (but keeps connection for potential reconnect).
*/
async onSessionDisconnect(userId: string): Promise<void> {
const topic = `USER:${userId}`;
this.socket.unsubscribe(topic);
this.logger.info({ userId, topic }, 'Unsubscribed from user events');
// Optionally disconnect from container after timeout
// (in case user reconnects quickly)
}
private async processMessages(): Promise<void> {
for await (const [topicBuf, payloadBuf] of this.socket) {
if (!this.running) break;
try {
const topic = topicBuf.toString();
const userId = topic.replace('USER:', '');
const event = deserializeUserEvent(payloadBuf);
// Forward to active WebSocket session
const ws = this.sessions.get(userId);
if (ws) {
ws.send(JSON.stringify({
type: 'event',
eventType: event.eventType,
eventId: event.eventId,
timestamp: event.timestamp,
payload: JSON.parse(event.payload.toString()),
}));
}
} catch (error) {
this.logger.error({ error }, 'Error processing informational event');
}
}
}
}
```
### Event Router (Critical Events)
```typescript
// gateway/src/events/event-router.ts
import * as zmq from 'zeromq';
import type { FastifyBaseLogger } from 'fastify';
import type { UserEvent, EventAck, AckStatus } from './types.js';
import { deserializeUserEvent, serializeEventAck } from './types.js';
import type { DeliveryService } from './delivery-service.js';
interface SessionRegistry {
get(userId: string): WebSocket | undefined;
has(userId: string): boolean;
}
/**
* ROUTER socket that receives critical events from all containers.
* Delivers events and sends acks back to containers.
*/
export class EventRouter {
private socket: zmq.Router;
private sessions: SessionRegistry;
private delivery: DeliveryService;
private logger: FastifyBaseLogger;
private running = false;
// Deduplication: track recently processed event IDs
private processedEvents: Map<string, number> = new Map();
private readonly DEDUP_TTL_MS = 5 * 60 * 1000; // 5 minutes
constructor(
sessions: SessionRegistry,
delivery: DeliveryService,
logger: FastifyBaseLogger,
bindEndpoint: string = 'tcp://*:5571'
) {
this.socket = new zmq.Router();
this.sessions = sessions;
this.delivery = delivery;
this.logger = logger;
this.socket.bind(bindEndpoint);
this.logger.info({ bindEndpoint }, 'Event router bound');
}
async start(): Promise<void> {
this.running = true;
this.processMessages();
this.cleanupDedup();
}
async stop(): Promise<void> {
this.running = false;
this.socket.close();
}
private async processMessages(): Promise<void> {
for await (const [identity, payload] of this.socket) {
if (!this.running) break;
try {
const event = deserializeUserEvent(payload);
// Deduplication check
if (this.processedEvents.has(event.eventId)) {
// Already processed - send success ack
await this.sendAck(identity, event.eventId, 'DELIVERED');
continue;
}
this.logger.info(
{ eventId: event.eventId, userId: event.userId, eventType: event.eventType },
'Processing critical event'
);
// Deliver through channel preferences
const status = await this.deliverEvent(event);
// Mark as processed
this.processedEvents.set(event.eventId, Date.now());
// Send ack back to container
await this.sendAck(identity, event.eventId, status);
} catch (error) {
this.logger.error({ error }, 'Error processing critical event');
// Don't ack - let container retry
}
}
}
private async deliverEvent(event: UserEvent): Promise<AckStatus> {
for (const pref of event.delivery.channels) {
// Skip if channel requires active session but none exists
if (pref.onlyIfActive && !this.sessions.has(event.userId)) {
continue;
}
try {
switch (pref.channel) {
case 'ACTIVE_SESSION': {
const ws = this.sessions.get(event.userId);
if (ws) {
ws.send(JSON.stringify({
type: 'event',
eventType: event.eventType,
eventId: event.eventId,
timestamp: event.timestamp,
payload: JSON.parse(event.payload.toString()),
}));
return 'DELIVERED';
}
break;
}
case 'TELEGRAM':
await this.delivery.sendTelegram(event.userId, event);
return 'DELIVERED';
case 'EMAIL':
await this.delivery.sendEmail(event.userId, event);
return 'DELIVERED';
case 'PUSH':
await this.delivery.sendPush(event.userId, event);
return 'DELIVERED';
}
} catch (error) {
this.logger.warn(
{ error, channel: pref.channel, eventId: event.eventId },
'Channel delivery failed, trying next'
);
// Continue to next channel preference
}
}
// All channels failed
return 'ERROR';
}
private async sendAck(
identity: Buffer,
eventId: string,
status: AckStatus,
errorMessage?: string
): Promise<void> {
const ack: EventAck = { eventId, status, errorMessage: errorMessage || '' };
await this.socket.send([identity, serializeEventAck(ack)]);
}
private cleanupDedup(): void {
setInterval(() => {
const now = Date.now();
for (const [eventId, timestamp] of this.processedEvents) {
if (now - timestamp > this.DEDUP_TTL_MS) {
this.processedEvents.delete(eventId);
}
}
}, 60000); // Cleanup every minute
}
}
```
### Delivery Service
```typescript
// gateway/src/events/delivery-service.ts
import type { FastifyBaseLogger } from 'fastify';
import type { UserEvent } from './types.js';
interface UserChannelConfig {
telegramChatId?: string;
email?: string;
pushToken?: string;
}
/**
* Handles actual delivery to external channels.
* Owns credentials for Telegram bot, email service, push notifications.
*/
export class DeliveryService {
private telegramBotToken: string;
private emailServiceKey: string;
private pushServiceKey: string;
private logger: FastifyBaseLogger;
// User channel configs (loaded from DB)
private userConfigs: Map<string, UserChannelConfig> = new Map();
constructor(config: {
telegramBotToken: string;
emailServiceKey: string;
pushServiceKey: string;
logger: FastifyBaseLogger;
}) {
this.telegramBotToken = config.telegramBotToken;
this.emailServiceKey = config.emailServiceKey;
this.pushServiceKey = config.pushServiceKey;
this.logger = config.logger;
}
async loadUserConfig(userId: string): Promise<UserChannelConfig> {
// TODO: Load from database
// For now, return cached or empty
return this.userConfigs.get(userId) || {};
}
async sendTelegram(userId: string, event: UserEvent): Promise<void> {
const config = await this.loadUserConfig(userId);
if (!config.telegramChatId) {
throw new Error('User has no Telegram chat ID configured');
}
const message = this.formatEventMessage(event);
const response = await fetch(
`https://api.telegram.org/bot${this.telegramBotToken}/sendMessage`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
chat_id: config.telegramChatId,
text: message,
parse_mode: 'Markdown',
}),
}
);
if (!response.ok) {
const error = await response.text();
throw new Error(`Telegram API error: ${error}`);
}
this.logger.info({ userId, eventId: event.eventId }, 'Sent Telegram notification');
}
async sendEmail(userId: string, event: UserEvent): Promise<void> {
const config = await this.loadUserConfig(userId);
if (!config.email) {
throw new Error('User has no email configured');
}
// TODO: Implement email sending via SendGrid/SES/etc.
this.logger.info({ userId, eventId: event.eventId }, 'Sent email notification');
}
async sendPush(userId: string, event: UserEvent): Promise<void> {
const config = await this.loadUserConfig(userId);
if (!config.pushToken) {
throw new Error('User has no push token configured');
}
// TODO: Implement push notification via Firebase/APNs
this.logger.info({ userId, eventId: event.eventId }, 'Sent push notification');
}
private formatEventMessage(event: UserEvent): string {
const payload = JSON.parse(event.payload.toString());
switch (event.eventType) {
case 'ORDER_FILLED':
return `🟢 *Order Filled*\n` +
`Symbol: ${payload.symbol}\n` +
`Side: ${payload.side}\n` +
`Price: ${payload.price}\n` +
`Quantity: ${payload.quantity}`;
case 'ALERT_TRIGGERED':
return `🔔 *Alert Triggered*\n` +
`Condition: ${payload.condition}\n` +
`Price: ${payload.triggered_price}`;
case 'POSITION_UPDATED':
return `📊 *Position Updated*\n` +
`Symbol: ${payload.symbol}\n` +
`Size: ${payload.size}\n` +
`PnL: ${payload.unrealized_pnl}`;
default:
return `📌 *Notification*\n${JSON.stringify(payload, null, 2)}`;
}
}
}
```
### Integration in Gateway Main
```typescript
// gateway/src/main.ts (additions)
import { EventSubscriber } from './events/event-subscriber.js';
import { EventRouter } from './events/event-router.js';
import { DeliveryService } from './events/delivery-service.js';
// In main setup:
const sessionRegistry = new Map<string, WebSocket>();
const deliveryService = new DeliveryService({
telegramBotToken: process.env.TELEGRAM_BOT_TOKEN!,
emailServiceKey: process.env.EMAIL_SERVICE_KEY!,
pushServiceKey: process.env.PUSH_SERVICE_KEY!,
logger: app.log,
});
const eventSubscriber = new EventSubscriber(sessionRegistry, app.log);
const eventRouter = new EventRouter(sessionRegistry, deliveryService, app.log);
await eventSubscriber.start();
await eventRouter.start();
// In WebSocket handler:
websocket.on('connection', async (ws, req) => {
const authContext = await authenticate(req);
const userId = authContext.userId;
const containerEndpoint = authContext.license.containerEventEndpoint;
// Register session
sessionRegistry.set(userId, ws);
// Subscribe to informational events
await eventSubscriber.onSessionConnect(userId, containerEndpoint);
ws.on('close', async () => {
sessionRegistry.delete(userId);
await eventSubscriber.onSessionDisconnect(userId);
});
});
```
## Protobuf Definitions
Add to `proto/user_events.proto`:
```protobuf
syntax = "proto3";
package dexorder.events;
message UserEvent {
string user_id = 1;
string event_id = 2;
int64 timestamp = 3;
EventType event_type = 4;
bytes payload = 5;
DeliverySpec delivery = 6;
}
enum EventType {
ORDER_PLACED = 0;
ORDER_FILLED = 1;
ORDER_CANCELLED = 2;
ALERT_TRIGGERED = 3;
POSITION_UPDATED = 4;
WORKSPACE_CHANGED = 5;
STRATEGY_LOG = 6;
}
message DeliverySpec {
Priority priority = 1;
repeated ChannelPreference channels = 2;
}
enum Priority {
INFORMATIONAL = 0;
NORMAL = 1;
CRITICAL = 2;
}
message ChannelPreference {
ChannelType channel = 1;
bool only_if_active = 2;
}
enum ChannelType {
ACTIVE_SESSION = 0;
WEB = 1;
TELEGRAM = 2;
EMAIL = 3;
PUSH = 4;
}
message EventAck {
string event_id = 1;
AckStatus status = 2;
string error_message = 3;
}
enum AckStatus {
DELIVERED = 0;
QUEUED = 1;
ERROR = 2;
}
```
## Kubernetes Configuration
### Container Ports
Add to agent deployment template:
```yaml
# deploy/k8s/templates/agent-deployment.yaml
spec:
containers:
- name: agent
ports:
- containerPort: 3000 # MCP server
name: mcp
- containerPort: 5570 # XPUB for informational events
name: events-xpub
```
### Gateway Configuration
```yaml
# deploy/k8s/base/gateway.yaml
spec:
containers:
- name: gateway
ports:
- containerPort: 3000 # HTTP/WebSocket
name: http
- containerPort: 5571 # ROUTER for critical events
name: events-router
env:
- name: TELEGRAM_BOT_TOKEN
valueFrom:
secretKeyRef:
name: gateway-secrets
key: telegram-bot-token
- name: EVENT_ROUTER_BIND
value: "tcp://*:5571"
```
### Network Policy
Allow containers to connect to gateway ROUTER:
```yaml
# deploy/k8s/base/network-policies.yaml (additions)
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: agent-to-gateway-events
namespace: dexorder-agents
spec:
podSelector:
matchLabels:
app: agent
policyTypes:
- Egress
egress:
- to:
- namespaceSelector:
matchLabels:
name: dexorder-system
podSelector:
matchLabels:
app: gateway
ports:
- protocol: TCP
port: 5571 # Critical events ROUTER
```
## Testing
### Unit Tests
```python
# tests/test_event_publisher.py
import pytest
import asyncio
from dexorder.events import EventPublisher, UserEvent, DeliverySpec, EventType
@pytest.mark.asyncio
async def test_informational_dropped_without_subscriber():
publisher = EventPublisher(user_id="test-user", xpub_port=15570)
await publisher.start()
# No subscriber connected
assert not publisher.has_active_subscriber()
# Publish informational event - should be silently dropped
event = UserEvent(
event_type=EventType.STRATEGY_LOG,
payload=b'{"test": true}',
delivery=DeliverySpec.informational(),
)
await publisher.publish(event)
# No pending events (was dropped, not queued)
assert len(publisher.pending_events) == 0
await publisher.stop()
@pytest.mark.asyncio
async def test_critical_queued_without_subscriber():
publisher = EventPublisher(user_id="test-user", xpub_port=15571)
await publisher.start()
# No subscriber connected
assert not publisher.has_active_subscriber()
# Publish critical event - should go to DEALER
event = UserEvent(
event_type=EventType.ORDER_FILLED,
payload=b'{"order_id": "123"}',
delivery=DeliverySpec.critical(),
)
await publisher.publish(event)
# Should be in pending queue
assert len(publisher.pending_events) == 1
await publisher.stop()
```
### Integration Test
```bash
# Start a mock gateway ROUTER
python -c "
import zmq
ctx = zmq.Context()
router = ctx.socket(zmq.ROUTER)
router.bind('tcp://*:5571')
print('Waiting for events...')
while True:
identity, payload = router.recv_multipart()
print(f'Received from {identity}: {payload}')
# Send ack
router.send_multipart([identity, b'\x21{\"event_id\":\"test\",\"status\":0}'])
"
# In another terminal, run container with test events
python -c "
import asyncio
from dexorder.events import EventPublisher, UserEvent, DeliverySpec, EventType
async def main():
pub = EventPublisher('test-user', gateway_router_endpoint='tcp://localhost:5571')
await pub.start()
event = UserEvent(
event_type=EventType.ORDER_FILLED,
payload=b'{\"test\": true}',
delivery=DeliverySpec.critical(),
)
await pub.publish(event)
await asyncio.sleep(5) # Wait for ack
print(f'Pending events: {len(pub.pending_events)}')
await pub.stop()
asyncio.run(main())
"
```
## Migration Plan
1. **Phase 1**: Add protobuf definitions, build Python/TypeScript types
2. **Phase 2**: Implement EventPublisher in container (inactive by default)
3. **Phase 3**: Implement EventSubscriber/EventRouter in gateway
4. **Phase 4**: Add DeliveryService with Telegram integration
5. **Phase 5**: Enable in container, test with single user
6. **Phase 6**: Roll out to all users, add email/push support
## Monitoring
Key metrics to track:
- `events_published_total{priority, channel}` - Events published by container
- `events_delivered_total{channel, status}` - Events delivered by gateway
- `events_pending_count` - Current pending critical events per container
- `event_delivery_latency_ms{channel}` - Time from publish to delivery
- `event_retry_total` - Retries due to ack timeout