# User Event Protocol User containers emit events (order executions, alerts, workspace changes) that must be delivered to users via their active session or external channels (Telegram, email, push). This requires two ZMQ patterns with different delivery guarantees. ## Event Flow Overview ``` ┌─────────────────────────────────────────────────────────────┐ │ User Container │ │ │ │ Strategy/Indicator Engine │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Event Publisher │ │ │ │ │ │ │ │ 1. Check delivery spec │ │ │ │ 2. If INFORMATIONAL or has_active_subscriber(): │ │ │ │ → XPUB (fast path) │ │ │ │ 3. Else (CRITICAL or no active session): │ │ │ │ → DEALER (guaranteed delivery) │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ │ │ XPUB socket DEALER socket │ │ (port 5570) (port 5571) │ └─────────┼───────────────────────────┼───────────────────────┘ │ │ ▼ ▼ ┌─────────────────────────────────────────────────────────────┐ │ Gateway Pool │ │ │ │ ┌──────────────────┐ ┌──────────────────────────┐ │ │ │ SUB socket │ │ ROUTER socket │ │ │ │ (per-session) │ │ (shared, any gateway) │ │ │ │ │ │ │ │ │ │ Subscribe to │ │ Pull event, deliver, │ │ │ │ USER:{user_id} │ │ send EventAck back │ │ │ │ on connect │ │ │ │ │ └────────┬─────────┘ └─────────────┬────────────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌─────────────┐ ┌─────────────────────────┐ │ │ │ Active WS/ │ │ Telegram API / Email / │ │ │ │ Telegram │ │ Push Notification │ │ │ └─────────────┘ └─────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘ ``` ## ZMQ Channels ### Channel 5 — Informational Events (Container → Gateway) **Pattern**: XPUB/SUB with subscription tracking - **Socket Type**: Container uses XPUB (bind), Gateway uses SUB (connect) - **Endpoint**: `tcp://*:5570` (Container binds) - **Message Types**: `UserEvent` - **Topic Format**: `USER:{user_id}` (e.g., `USER:user-abc123`) - **Behavior**: - Gateway subscribes to `USER:{user_id}` when user's WebSocket/Telegram session connects - Gateway unsubscribes when session disconnects - Container uses XPUB with `ZMQ_XPUB_VERBOSE` to track active subscriptions - Container checks subscription set before publishing - If no subscriber, message is either dropped (INFORMATIONAL) or routed to critical channel - Zero coordination, fire-and-forget for active sessions ### Channel 6 — Critical Events (Container → Gateway) **Pattern**: DEALER/ROUTER with acknowledgment - **Socket Type**: Container uses DEALER (connect), Gateway uses ROUTER (bind) - **Endpoint**: `tcp://gateway:5571` (Gateway binds, containers connect) - **Message Types**: `UserEvent` → `EventAck` - **Behavior**: - Container sends `UserEvent` with `event_id` via DEALER - DEALER round-robins to available gateway ROUTER sockets - Gateway processes event (sends to Telegram, email, etc.) - Gateway sends `EventAck` back to container - Container tracks pending events with timeout (30s default) - On timeout without ack: resend (DEALER routes to next gateway) - On container shutdown: persist pending to disk, reload on startup - Provides at-least-once delivery guarantee ## Message Envelope User event channels use the standard two-frame envelope (see `protocol.md`), with PUB/SUB adding a topic frame prefix: ``` Frame 1 (SUB only): [topic: "USER:{user_id}"] Frame 2: [1 byte: protocol version] Frame 3: [0x20 or 0x21][protobuf payload] ``` ## Protobuf Schemas ### UserEvent (0x20) ```protobuf message UserEvent { string user_id = 1; string event_id = 2; // UUID for dedup/ack int64 timestamp = 3; // Unix millis EventType event_type = 4; bytes payload = 5; // JSON or nested protobuf DeliverySpec delivery = 6; } enum EventType { ORDER_PLACED = 0; ORDER_FILLED = 1; ORDER_CANCELLED = 2; ALERT_TRIGGERED = 3; POSITION_UPDATED = 4; WORKSPACE_CHANGED = 5; STRATEGY_LOG = 6; } message DeliverySpec { Priority priority = 1; repeated ChannelPreference channels = 2; // Ordered preference list } enum Priority { INFORMATIONAL = 0; // Drop if no active session NORMAL = 1; // Best effort, short queue CRITICAL = 2; // Must deliver, retry, escalate } message ChannelPreference { ChannelType channel = 1; bool only_if_active = 2; // true = skip if not connected } enum ChannelType { ACTIVE_SESSION = 0; // Whatever's currently connected WEB = 1; TELEGRAM = 2; EMAIL = 3; PUSH = 4; // Mobile push notification } ``` ### EventAck (0x21) ```protobuf message EventAck { string event_id = 1; AckStatus status = 2; string error_message = 3; // If status is ERROR } enum AckStatus { DELIVERED = 0; // Successfully sent to at least one channel QUEUED = 1; // Accepted, will retry (e.g., Telegram rate limit) ERROR = 2; // Permanent failure } ``` **Language note**: JavaScript protobufs convert field names to camelCase; Python retains snake_case. ## Subscription Tracking (Container Side) Container uses XPUB to detect active sessions: ```python xpub_socket = ctx.socket(zmq.XPUB) xpub_socket.setsockopt(zmq.XPUB_VERBOSE, 1) # Receive all sub/unsub xpub_socket.bind("tcp://*:5570") active_subscriptions: set[str] = set() def process_subscriptions(): while xpub_socket.poll(0): msg = xpub_socket.recv() topic = msg[1:].decode() # Skip first byte (sub/unsub flag) if msg[0] == 1: # Subscribe active_subscriptions.add(topic) elif msg[0] == 0: # Unsubscribe active_subscriptions.discard(topic) def has_active_subscriber(user_id: str) -> bool: return f"USER:{user_id}" in active_subscriptions ``` ## Event Routing Logic (Container Side) ```python def publish_event(event: UserEvent): topic = f"USER:{event.user_id}" if event.delivery.priority == Priority.INFORMATIONAL: if has_active_subscriber(event.user_id): xpub_socket.send_multipart([topic.encode(), serialize(event)]) # else: silently drop elif has_active_subscriber(event.user_id): # Active session exists — use fast path xpub_socket.send_multipart([topic.encode(), serialize(event)]) else: # No active session — use guaranteed delivery send_via_dealer(event) def send_via_dealer(event: UserEvent): pending_events[event.event_id] = PendingEvent( event=event, sent_at=time.time(), retries=0 ) dealer_socket.send(serialize(event)) ``` ## Delivery Examples ```python # "Show on screen if they're watching, otherwise don't bother" UserEvent( delivery=DeliverySpec( priority=Priority.INFORMATIONAL, channels=[ChannelPreference(ChannelType.ACTIVE_SESSION, only_if_active=True)] ) ) # "Active session preferred, fallback to Telegram" UserEvent( delivery=DeliverySpec( priority=Priority.NORMAL, channels=[ ChannelPreference(ChannelType.ACTIVE_SESSION, only_if_active=True), ChannelPreference(ChannelType.TELEGRAM, only_if_active=False), ] ) ) # "Order executed - MUST get through" UserEvent( delivery=DeliverySpec( priority=Priority.CRITICAL, channels=[ ChannelPreference(ChannelType.ACTIVE_SESSION, only_if_active=True), ChannelPreference(ChannelType.TELEGRAM, only_if_active=False), ChannelPreference(ChannelType.PUSH, only_if_active=False), ChannelPreference(ChannelType.EMAIL, only_if_active=False), ] ) ) ``` ## Gateway Event Processing Gateway maintains: 1. **Session registry**: Maps user_id → active WebSocket/channel connections 2. **Channel credentials**: Telegram bot token, email service keys, push certificates 3. **SUB socket per user session**: Subscribes to `USER:{user_id}` on container's XPUB 4. **Shared ROUTER socket**: Receives critical events from any container ```typescript async onSessionConnect(userId: string, ws: WebSocket) { subSocket.subscribe(`USER:${userId}`); sessions.set(userId, ws); } async onSessionDisconnect(userId: string) { subSocket.unsubscribe(`USER:${userId}`); sessions.delete(userId); } // Handle informational events (from SUB socket) subSocket.on('message', (topic, payload) => { const event = deserialize(payload); const ws = sessions.get(event.userId); if (ws) { ws.send(JSON.stringify({ type: 'event', ...event })); } }); // Handle critical events (from ROUTER socket) routerSocket.on('message', (identity, payload) => { const event = deserialize(payload); deliverEvent(event).then(status => { routerSocket.send([identity, serialize(EventAck(event.eventId, status))]); }); }); async function deliverEvent(event: UserEvent): Promise { for (const pref of event.delivery.channels) { if (pref.onlyIfActive && !sessions.has(event.userId)) continue; switch (pref.channel) { case ChannelType.ACTIVE_SESSION: const ws = sessions.get(event.userId); if (ws) { ws.send(...); return AckStatus.DELIVERED; } break; case ChannelType.TELEGRAM: await telegramBot.sendMessage(event.userId, formatEvent(event)); return AckStatus.DELIVERED; case ChannelType.EMAIL: await emailService.send(event.userId, formatEvent(event)); return AckStatus.DELIVERED; } } return AckStatus.ERROR; } ``` ## Error Handling - **Informational events**: dropped silently if no active session (by design) - **Critical events**: container retries on ack timeout (30s default) - **Deduplication**: gateway tracks `event_id` for 5 minutes to suppress retransmits - **All channels fail**: return ERROR ack; container may escalate or log - **Container shutdown**: persist pending critical events to disk, reload on startup