# ZeroMQ Protocol Architecture Our data transfer protocol uses ZeroMQ with Protobufs. We send a small envelope with a protocol version byte as the first frame, then a type ID as the first byte of the second frame, followed by the protobuf payload also in the second frame. OHLC periods are represented as seconds. ## Data Flow Overview **Relay as Gateway**: The Relay is a well-known bind point that all components connect to. It routes messages between clients, ingestors, and Flink. ### Historical Data Query Flow (Async Event-Driven Architecture) * Client generates request_id and/or client_id (both are client-generated) * Client computes notification topic: `RESPONSE:{client_id}` or `HISTORY_READY:{request_id}` * **Client subscribes to notification topic BEFORE sending request (prevents race condition)** * Client sends SubmitHistoricalRequest to Relay (REQ/REP) * Relay returns immediate SubmitResponse with request_id and notification_topic (for confirmation) * Relay publishes DataRequest to ingestor work queue with exchange prefix (PUB/SUB) * Ingestor receives request, fetches data from exchange * Ingestor writes OHLC data to Kafka with __metadata in first record * Flink reads from Kafka, processes data, writes to Iceberg * Flink publishes HistoryReadyNotification to ZMQ PUB socket (port 5557) with deterministic topic * Relay proxies notification via XSUB → XPUB to clients * Client receives notification (already subscribed) and queries Iceberg for data **Key Architectural Change**: Relay is completely stateless. No request/response correlation needed. All notification routing is topic-based (e.g., "RESPONSE:{client_id}"). **Race Condition Prevention**: Notification topics are deterministic based on client-generated values (request_id or client_id). Clients MUST subscribe to the notification topic BEFORE submitting the request to avoid missing notifications. **Two Notification Patterns**: 1. **Per-client topic** (`RESPONSE:{client_id}`): Subscribe once during connection, reuse for all requests from this client. Recommended for most clients. 2. **Per-request topic** (`HISTORY_READY:{request_id}`): Subscribe immediately before each request. Use when you need per-request isolation or don't have a persistent client_id. ### Realtime Data Flow (Flink → Relay → Clients) * Ingestors write realtime ticks to Kafka * Flink reads from Kafka, processes OHLC aggregations, CEP triggers * Flink publishes market data via ZMQ PUB * Relay subscribes to Flink (XSUB) and fanouts to clients (XPUB) * Clients subscribe to specific tickers ### Symbol Metadata Update Flow (Flink → Gateways) * Ingestors write symbol metadata to Kafka * Flink reads from Kafka, writes to Iceberg symbol_metadata table * After committing to Iceberg, Flink publishes SymbolMetadataUpdated notification on MARKET_DATA_PUB * Gateways subscribe to METADATA_UPDATE topic on startup * Upon receiving notification, gateways reload symbol metadata from Iceberg * This prevents race conditions where gateways start before symbol metadata is available ### Data Processing (Kafka → Flink → Iceberg) * All market data flows through Kafka (durable event log) * Flink processes streams for aggregations and CEP * Flink writes historical data to Apache Iceberg tables * Clients can query Iceberg for historical data (alternative to ingestor backfill) **Key Design Principles**: * Relay is the well-known bind point - all other components connect to it * Relay is completely stateless - no request tracking, only topic-based routing * Exchange prefix filtering allows ingestor specialization (e.g., only BINANCE ingestors) * Historical data flows through Kafka (durable processing) only - no direct response * Async event-driven notifications via pub/sub (Flink → Relay → Clients) * Protobufs over ZMQ for all inter-service communication * Kafka for durability and Flink stream processing * Iceberg for long-term historical storage and client queries ## ZeroMQ Channels and Patterns All sockets bind on **Relay** (well-known endpoint). Components connect to relay. ### 1. Client Request Channel (Clients → Relay) **Pattern**: ROUTER (Relay binds, Clients use REQ) - **Socket Type**: Relay uses ROUTER (bind), Clients use REQ (connect) - **Endpoint**: `tcp://*:5559` (Relay binds) - **Message Types**: `SubmitHistoricalRequest` → `SubmitResponse` - **Behavior**: - Client generates request_id and/or client_id - Client computes notification topic deterministically - **Client subscribes to notification topic FIRST (prevents race)** - Client sends REQ for historical OHLC data - Relay validates request and returns immediate acknowledgment - Response includes notification_topic for client confirmation - Relay publishes DataRequest to ingestor work queue - No request tracking - relay is stateless ### 2. Ingestor Work Queue (Relay → Ingestors) **Pattern**: PUB/SUB with exchange prefix filtering - **Socket Type**: Relay uses PUB (bind), Ingestors use SUB (connect) - **Endpoint**: `tcp://*:5555` (Relay binds) - **Message Types**: `DataRequest` (historical or realtime) - **Topic Prefix**: Exchange name (e.g., `BINANCE:`, `COINBASE:`) - **Behavior**: - Relay publishes work with exchange prefix from ticker - Ingestors subscribe only to exchanges they support - Multiple ingestors can compete for same exchange - Ingestors write data to Kafka only (no direct response) - Flink processes Kafka → Iceberg → notification ### 3. Market Data Fanout (Relay ↔ Flink ↔ Clients) **Pattern**: XPUB/XSUB proxy - **Socket Type**: - Relay XPUB (bind) ← Clients SUB (connect) - Port 5558 - Relay XSUB (connect) → Flink PUB (bind) - Port 5557 - **Message Types**: `Tick`, `OHLC`, `HistoryReadyNotification`, `SymbolMetadataUpdated` - **Topic Formats**: - Market data: `{ticker}|{data_type}` (e.g., `BINANCE:BTC/USDT|tick`) - Notifications: `RESPONSE:{client_id}` or `HISTORY_READY:{request_id}` - System notifications: `METADATA_UPDATE` (for symbol metadata updates) - **Behavior**: - Clients subscribe to ticker topics and notification topics via Relay XPUB - Relay forwards subscriptions to Flink via XSUB - Flink publishes processed market data and notifications - Relay proxies data to subscribed clients (stateless forwarding) - Dynamic subscription management (no pre-registration) ### 4. Ingestor Control Channel (Optional - Future Use) **Pattern**: PUB/SUB (Broadcast control) - **Socket Type**: Relay uses PUB, Ingestors use SUB - **Endpoint**: `tcp://*:5557` (Relay binds) - **Message Types**: `IngestorControl` (cancel, config updates) - **Behavior**: - Broadcast control messages to all ingestors - Used for realtime subscription cancellation - Configuration updates ## Message Envelope Format The core protocol uses two ZeroMQ frames: ``` Frame 1: [1 byte: protocol version] Frame 2: [1 byte: message type ID][N bytes: protobuf message] ``` This two-frame approach allows receivers to check the protocol version before parsing the message type and protobuf payload. **Important**: Some ZeroMQ socket patterns (PUB/SUB, XPUB/XSUB) may prepend additional frames for routing purposes. For example: - **PUB/SUB with topic filtering**: SUB sockets receive `[topic frame][version frame][message frame]` - **ROUTER sockets**: Prepend identity frames before the message Components must handle these additional frames appropriately: - SUB sockets: Skip the first frame (topic), then parse the remaining frames as the standard 2-frame envelope - ROUTER sockets: Extract identity frames, then parse the standard 2-frame envelope The two-frame envelope is the **logical protocol format**, but physical transmission may include additional ZeroMQ transport frames. ## Message Type IDs | Type ID | Message Type | Description | |---------|---------------------------|------------------------------------------------| | 0x01 | DataRequest | Request for historical or realtime data | | 0x02 | DataResponse (deprecated) | Historical data response (no longer used) | | 0x03 | IngestorControl | Control messages for ingestors | | 0x04 | Tick | Individual trade tick data | | 0x05 | OHLC | Single OHLC candle with volume | | 0x06 | Market | Market metadata | | 0x07 | OHLCRequest (deprecated) | Client request (replaced by SubmitHistorical) | | 0x08 | Response (deprecated) | Generic response (replaced by SubmitResponse) | | 0x09 | CEPTriggerRequest | Register CEP trigger | | 0x0A | CEPTriggerAck | CEP trigger acknowledgment | | 0x0B | CEPTriggerEvent | CEP trigger fired callback | | 0x0C | OHLCBatch | Batch of OHLC rows with metadata (Kafka) | | 0x10 | SubmitHistoricalRequest | Client request for historical data (async) | | 0x11 | SubmitResponse | Immediate ack with notification topic | | 0x12 | HistoryReadyNotification | Notification that data is ready in Iceberg | | 0x13 | SymbolMetadataUpdated | Notification that symbol metadata refreshed | ## User Container Event System User containers emit events (order executions, alerts, workspace changes) that must be delivered to users via their active session or external channels (Telegram, email, push). This requires two ZMQ patterns with different delivery guarantees. ### Event Flow Overview ``` ┌─────────────────────────────────────────────────────────────┐ │ User Container │ │ │ │ Strategy/Indicator Engine │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Event Publisher │ │ │ │ │ │ │ │ 1. Check delivery spec │ │ │ │ 2. If INFORMATIONAL or has_active_subscriber(): │ │ │ │ → XPUB (fast path) │ │ │ │ 3. Else (CRITICAL or no active session): │ │ │ │ → DEALER (guaranteed delivery) │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ │ │ XPUB socket DEALER socket │ │ (port 5570) (port 5571) │ └─────────┼───────────────────────────┼───────────────────────┘ │ │ ▼ ▼ ┌─────────────────────────────────────────────────────────────┐ │ Gateway Pool │ │ │ │ ┌──────────────────┐ ┌──────────────────────────┐ │ │ │ SUB socket │ │ ROUTER socket │ │ │ │ (per-session) │ │ (shared, any gateway) │ │ │ │ │ │ │ │ │ │ Subscribe to │ │ Pull event, deliver, │ │ │ │ USER:{user_id} │ │ send EventAck back │ │ │ │ on connect │ │ │ │ │ └────────┬─────────┘ └─────────────┬────────────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌─────────────┐ ┌─────────────────────────┐ │ │ │ Active WS/ │ │ Telegram API / Email / │ │ │ │ Telegram │ │ Push Notification │ │ │ └─────────────┘ └─────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘ ``` ### 5. User Event Channel - Informational (Container → Gateway) **Pattern**: XPUB/SUB with subscription tracking - **Socket Type**: Container uses XPUB (bind), Gateway uses SUB (connect) - **Endpoint**: `tcp://*:5570` (Container binds) - **Message Types**: `UserEvent` - **Topic Format**: `USER:{user_id}` (e.g., `USER:user-abc123`) - **Behavior**: - Gateway subscribes to `USER:{user_id}` when user's WebSocket/Telegram session connects - Gateway unsubscribes when session disconnects - Container uses XPUB with `ZMQ_XPUB_VERBOSE` to track active subscriptions - Container checks subscription set before publishing - If no subscriber, message is either dropped (INFORMATIONAL) or routed to critical channel - Zero coordination, fire-and-forget for active sessions ### 6. User Event Channel - Critical (Container → Gateway) **Pattern**: DEALER/ROUTER with acknowledgment - **Socket Type**: Container uses DEALER (connect), Gateway uses ROUTER (bind) - **Endpoint**: `tcp://gateway:5571` (Gateway binds, containers connect) - **Message Types**: `UserEvent` → `EventAck` - **Behavior**: - Container sends `UserEvent` with `event_id` via DEALER - DEALER round-robins to available gateway ROUTER sockets - Gateway processes event (sends to Telegram, email, etc.) - Gateway sends `EventAck` back to container - Container tracks pending events with timeout (30s default) - On timeout without ack: resend (DEALER routes to next gateway) - On container shutdown: persist pending to disk, reload on startup - Provides at-least-once delivery guarantee ### Subscription Tracking (Container Side) Container uses XPUB to detect active sessions: ```python # Container event publisher initialization xpub_socket = ctx.socket(zmq.XPUB) xpub_socket.setsockopt(zmq.XPUB_VERBOSE, 1) # Receive all sub/unsub xpub_socket.bind("tcp://*:5570") active_subscriptions: set[str] = set() # In event loop, handle subscription messages def process_subscriptions(): while xpub_socket.poll(0): msg = xpub_socket.recv() topic = msg[1:].decode() # Skip first byte (sub/unsub flag) if msg[0] == 1: # Subscribe active_subscriptions.add(topic) elif msg[0] == 0: # Unsubscribe active_subscriptions.discard(topic) def has_active_subscriber(user_id: str) -> bool: return f"USER:{user_id}" in active_subscriptions ``` ### Event Routing Logic (Container Side) ```python def publish_event(event: UserEvent): topic = f"USER:{event.user_id}" if event.delivery.priority == Priority.INFORMATIONAL: # Fire and forget - drop if nobody's listening if has_active_subscriber(event.user_id): xpub_socket.send_multipart([topic.encode(), serialize(event)]) # else: silently drop elif has_active_subscriber(event.user_id): # Active session exists - use fast path xpub_socket.send_multipart([topic.encode(), serialize(event)]) else: # No active session - use guaranteed delivery send_via_dealer(event) def send_via_dealer(event: UserEvent): pending_events[event.event_id] = PendingEvent( event=event, sent_at=time.time(), retries=0 ) dealer_socket.send(serialize(event)) ``` ### Message Type IDs (User Events) | Type ID | Message Type | Description | |---------|-----------------|------------------------------------------------| | 0x20 | UserEvent | Container → Gateway event | | 0x21 | EventAck | Gateway → Container acknowledgment | ### UserEvent Message ```protobuf message UserEvent { string user_id = 1; string event_id = 2; // UUID for dedup/ack int64 timestamp = 3; // Unix millis EventType event_type = 4; bytes payload = 5; // JSON or nested protobuf DeliverySpec delivery = 6; } enum EventType { ORDER_PLACED = 0; ORDER_FILLED = 1; ORDER_CANCELLED = 2; ALERT_TRIGGERED = 3; POSITION_UPDATED = 4; WORKSPACE_CHANGED = 5; STRATEGY_LOG = 6; } message DeliverySpec { Priority priority = 1; repeated ChannelPreference channels = 2; // Ordered preference list } enum Priority { INFORMATIONAL = 0; // Drop if no active session NORMAL = 1; // Best effort, short queue CRITICAL = 2; // Must deliver, retry, escalate } message ChannelPreference { ChannelType channel = 1; bool only_if_active = 2; // true = skip if not connected } enum ChannelType { ACTIVE_SESSION = 0; // Whatever's currently connected WEB = 1; TELEGRAM = 2; EMAIL = 3; PUSH = 4; // Mobile push notification } ``` ### EventAck Message ```protobuf message EventAck { string event_id = 1; AckStatus status = 2; string error_message = 3; // If status is ERROR } enum AckStatus { DELIVERED = 0; // Successfully sent to at least one channel QUEUED = 1; // Accepted, will retry (e.g., Telegram rate limit) ERROR = 2; // Permanent failure } ``` ### Language Notes - JavaScript protobufs will convert field names to camelCase. - Python will retain snake_case. ### Delivery Examples ```python # "Show on screen if they're watching, otherwise don't bother" # → Uses XPUB path only, dropped if no subscriber UserEvent( delivery=DeliverySpec( priority=Priority.INFORMATIONAL, channels=[ChannelPreference(ChannelType.ACTIVE_SESSION, only_if_active=True)] ) ) # "Active session preferred, fallback to Telegram" # → Tries XPUB first (if subscribed), else DEALER for Telegram delivery UserEvent( delivery=DeliverySpec( priority=Priority.NORMAL, channels=[ ChannelPreference(ChannelType.ACTIVE_SESSION, only_if_active=True), ChannelPreference(ChannelType.TELEGRAM, only_if_active=False), ] ) ) # "Order executed - MUST get through" # → Always uses DEALER path for guaranteed delivery UserEvent( delivery=DeliverySpec( priority=Priority.CRITICAL, channels=[ ChannelPreference(ChannelType.ACTIVE_SESSION, only_if_active=True), ChannelPreference(ChannelType.TELEGRAM, only_if_active=False), ChannelPreference(ChannelType.PUSH, only_if_active=False), ChannelPreference(ChannelType.EMAIL, only_if_active=False), ] ) ) ``` ### Gateway Event Processing Gateway maintains: 1. **Session registry**: Maps user_id → active WebSocket/channel connections 2. **Channel credentials**: Telegram bot token, email service keys, push certificates 3. **SUB socket per user session**: Subscribes to `USER:{user_id}` on container's XPUB 4. **Shared ROUTER socket**: Receives critical events from any container ```typescript // On user WebSocket connect async onSessionConnect(userId: string, ws: WebSocket) { // Subscribe to user's informational events subSocket.subscribe(`USER:${userId}`); sessions.set(userId, ws); } // On user WebSocket disconnect async onSessionDisconnect(userId: string) { subSocket.unsubscribe(`USER:${userId}`); sessions.delete(userId); } // Handle informational events (from SUB socket) subSocket.on('message', (topic, payload) => { const event = deserialize(payload); const ws = sessions.get(event.userId); if (ws) { ws.send(JSON.stringify({ type: 'event', ...event })); } }); // Handle critical events (from ROUTER socket) routerSocket.on('message', (identity, payload) => { const event = deserialize(payload); deliverEvent(event).then(status => { routerSocket.send([identity, serialize(EventAck(event.eventId, status))]); }); }); async function deliverEvent(event: UserEvent): Promise { for (const pref of event.delivery.channels) { if (pref.onlyIfActive && !sessions.has(event.userId)) continue; switch (pref.channel) { case ChannelType.ACTIVE_SESSION: const ws = sessions.get(event.userId); if (ws) { ws.send(...); return AckStatus.DELIVERED; } break; case ChannelType.TELEGRAM: await telegramBot.sendMessage(event.userId, formatEvent(event)); return AckStatus.DELIVERED; case ChannelType.EMAIL: await emailService.send(event.userId, formatEvent(event)); return AckStatus.DELIVERED; // ... etc } } return AckStatus.ERROR; } ``` ## Error Handling **Async Architecture Error Handling**: - Failed historical requests: ingestor writes error marker to Kafka - Flink reads error marker and publishes HistoryReadyNotification with ERROR status - Client timeout: if no notification received within timeout, assume failure - Realtime requests cancelled via control channel if ingestor fails - REQ/REP timeouts: 30 seconds default for client request submission - PUB/SUB has no delivery guarantees (Kafka provides durability) - No response routing needed - all notifications via topic-based pub/sub **User Event Error Handling**: - Informational events: dropped silently if no active session (by design) - Critical events: container retries on ack timeout (30s default) - Gateway tracks event_id for deduplication (5 minute window) - If all channels fail: return ERROR ack, container may escalate or log - Container persists pending critical events to disk on shutdown **Durability**: - All data flows through Kafka for durability - Flink checkpointing ensures exactly-once processing - Client can retry request with new request_id if notification not received - Critical user events use DEALER/ROUTER with ack for at-least-once delivery ## Scaling ### TODO: Flink-to-Relay ZMQ Discovery Currently Relay connects to Flink via XSUB on a single endpoint. With multiple Flink instances behind a K8s service, we need many-to-many connectivity. **Problem**: K8s service load balancing doesn't help ZMQ since connections are persistent. Relay needs to connect to ALL Flink instances to receive all published messages. **Proposed Solution**: Use a K8s headless service for Flink workers: ```yaml apiVersion: v1 kind: Service metadata: name: flink-workers spec: clusterIP: None selector: app: flink ``` Relay implementation: 1. On startup and periodically (every N seconds), resolve `flink-workers.namespace.svc.cluster.local` 2. DNS returns A records for all Flink pod IPs 3. Diff against current XSUB connections 4. Connect to new pods, disconnect from removed pods **Alternative approaches considered**: - XPUB/XSUB broker: Adds single point of failure and latency - Service discovery (etcd/Redis): More complex, requires additional infrastructure **Open questions**: - Appropriate polling interval for DNS resolution (5-10 seconds?) - Handling of brief disconnection during pod replacement - Whether to use K8s Endpoints API watch instead of DNS polling for faster reaction