/** * ZMQ Relay Client for historical data requests * * IMPORTANT: Implements race-condition-free notification subscription * by subscribing to RESPONSE:{client_id} topic BEFORE sending requests. * * Architecture: * - REQ socket to relay (port 5559) for SubmitHistoricalRequest * - SUB socket to relay (port 5558) for HistoryReadyNotification * - Notification topic: RESPONSE:{client_id} (deterministic, client-generated) */ import * as zmq from 'zeromq'; import type { FastifyBaseLogger } from 'fastify'; import { randomUUID } from 'crypto'; import { encodeSubmitHistoricalRequest, decodeSubmitResponse, decodeHistoryReadyNotification, } from './zmq-protocol.js'; import type { SubmitHistoricalRequest, HistoryReadyNotification, } from '../types/ohlc.js'; import { SubmitStatus, NotificationStatus, } from '../types/ohlc.js'; export interface ZMQRelayConfig { relayRequestEndpoint: string; // e.g., "tcp://relay:5559" relayNotificationEndpoint: string; // e.g., "tcp://relay:5558" clientId?: string; // Optional client ID, will generate if not provided requestTimeout?: number; // Request timeout in ms (default: 60000) onMetadataUpdate?: () => Promise; // Callback when symbol metadata updates } interface PendingRequest { resolve: (notification: HistoryReadyNotification) => void; reject: (error: Error) => void; timeoutHandle: NodeJS.Timeout; } /** * ZMQ Relay Client * * Provides async API for submitting historical data requests and waiting for * completion notifications. */ export class ZMQRelayClient { private config: Required; private logger: FastifyBaseLogger; private reqSocket?: zmq.Request; private subSocket?: zmq.Subscriber; private notificationTopic: string; private pendingRequests: Map = new Map(); private connected = false; private notificationListenerRunning = false; constructor(config: ZMQRelayConfig, logger: FastifyBaseLogger) { this.config = { relayRequestEndpoint: config.relayRequestEndpoint, relayNotificationEndpoint: config.relayNotificationEndpoint, clientId: config.clientId || `gateway-${randomUUID().slice(0, 8)}`, requestTimeout: config.requestTimeout || 60000, onMetadataUpdate: config.onMetadataUpdate || (async () => {}), }; this.logger = logger; this.notificationTopic = `RESPONSE:${this.config.clientId}`; } /** * Connect to relay and start notification listener * * CRITICAL: This MUST be called before making any requests. * The notification listener subscribes to RESPONSE:{client_id} topic * BEFORE any requests are sent, preventing race conditions. */ async connect(): Promise { if (this.connected) { return; } this.logger.info({ requestEndpoint: this.config.relayRequestEndpoint, notificationEndpoint: this.config.relayNotificationEndpoint, clientId: this.config.clientId, notificationTopic: this.notificationTopic, }, 'Connecting to ZMQ relay'); // Create REQ socket for requests this.reqSocket = new zmq.Request(); this.reqSocket.connect(this.config.relayRequestEndpoint); // Create SUB socket for notifications this.subSocket = new zmq.Subscriber(); this.subSocket.connect(this.config.relayNotificationEndpoint); // Subscribe to our notification topic BEFORE sending any requests this.subSocket.subscribe(this.notificationTopic); // Subscribe to system metadata update notifications this.subSocket.subscribe('METADATA_UPDATE'); this.logger.info({ topics: [this.notificationTopic, 'METADATA_UPDATE'] }, 'Subscribed to notification topics'); // Start notification listener this.startNotificationListener(); // Give sockets a moment to connect await new Promise(resolve => setTimeout(resolve, 100)); this.connected = true; this.logger.info('ZMQ relay client connected'); } /** * Request historical OHLC data * * IMPORTANT: Call connect() before using this method. * * @param ticker Market identifier (e.g., "BTC/USDT.BINANCE") * @param period_seconds OHLC period in seconds * @param start_time Start timestamp in MICROSECONDS * @param end_time End timestamp in MICROSECONDS * @param limit Optional limit on number of candles * @returns Promise that resolves when data is ready in Iceberg */ async requestHistoricalOHLC( ticker: string, period_seconds: number, start_time: bigint, end_time: bigint, limit?: number ): Promise { if (!this.connected || !this.reqSocket) { throw new Error('Client not connected. Call connect() first.'); } const request_id = randomUUID(); this.logger.debug({ request_id, ticker, period_seconds, start_time: start_time.toString(), end_time: end_time.toString(), }, 'Submitting historical OHLC request'); const request: SubmitHistoricalRequest = { request_id, ticker, start_time, end_time, period_seconds, limit, client_id: this.config.clientId, }; // Register pending request BEFORE sending (notification listener is already running) const resultPromise = new Promise((resolve, reject) => { const timeoutHandle = setTimeout(() => { this.pendingRequests.delete(request_id); reject(new Error(`Request ${request_id} timed out after ${this.config.requestTimeout}ms`)); }, this.config.requestTimeout); this.pendingRequests.set(request_id, { resolve, reject, timeoutHandle }); }); // Encode and send request const frames = encodeSubmitHistoricalRequest(request); try { // Send two frames: version, then message await this.reqSocket.send(frames); // Wait for immediate acknowledgment const responseFrames = await this.reqSocket.receive(); this.logger.debug({ frameCount: responseFrames.length, frameLengths: Array.from(responseFrames).map(f => f.length), }, 'Received response frames from relay'); const response = decodeSubmitResponse(Array.from(responseFrames)); this.logger.debug({ request_id, response, }, 'Decoded SubmitResponse'); if (response.status !== SubmitStatus.QUEUED) { // Request was rejected - clean up pending request const pending = this.pendingRequests.get(request_id); if (pending) { clearTimeout(pending.timeoutHandle); this.pendingRequests.delete(request_id); } throw new Error(`Request rejected: ${response.error_message || 'Unknown error'}`); } this.logger.debug({ request_id }, 'Request queued, waiting for notification'); // Wait for notification (already subscribed to topic) return await resultPromise; } catch (error) { // Clean up pending request on error const pending = this.pendingRequests.get(request_id); if (pending) { clearTimeout(pending.timeoutHandle); this.pendingRequests.delete(request_id); } this.logger.error({ error, request_id, ticker, errorMessage: error instanceof Error ? error.message : String(error), errorStack: error instanceof Error ? error.stack : undefined, }, 'Failed to submit historical OHLC request'); throw error; } } /** * Start notification listener * * CRITICAL: This runs BEFORE any requests are submitted to prevent race condition. * We're already subscribed to RESPONSE:{client_id} and METADATA_UPDATE, so we'll receive all notifications. */ private startNotificationListener(): void { if (this.notificationListenerRunning || !this.subSocket) { return; } this.notificationListenerRunning = true; // Listen for notifications asynchronously (async () => { try { for await (const frames of this.subSocket!) { try { // First frame is the topic const topic = frames[0].toString(); // Handle metadata update notifications if (topic === 'METADATA_UPDATE') { this.logger.info('Received METADATA_UPDATE notification'); // Call the onMetadataUpdate callback if configured if (this.config.onMetadataUpdate) { try { await this.config.onMetadataUpdate(); } catch (error) { this.logger.error({ error }, 'Failed to handle metadata update'); } } continue; } // Handle history ready notifications const notification = decodeHistoryReadyNotification(Array.from(frames)); this.logger.debug({ request_id: notification.request_id, status: NotificationStatus[notification.status], row_count: notification.row_count, }, 'Received history ready notification'); // Check if we're waiting for this request const pending = this.pendingRequests.get(notification.request_id); if (pending) { clearTimeout(pending.timeoutHandle); this.pendingRequests.delete(notification.request_id); if (notification.status === NotificationStatus.OK) { pending.resolve(notification); } else { pending.reject(new Error( `Historical data request failed: ${notification.error_message || NotificationStatus[notification.status]}` )); } } else { this.logger.warn({ request_id: notification.request_id, }, 'Received notification for unknown request'); } } catch (error) { this.logger.error({ error }, 'Failed to process notification'); } } } catch (error) { if (this.notificationListenerRunning) { this.logger.error({ error }, 'Notification listener error'); } } finally { this.notificationListenerRunning = false; } })(); this.logger.debug('Notification listener started'); } /** * Close the client and cleanup resources */ async close(): Promise { if (!this.connected) { return; } this.logger.info('Closing ZMQ relay client'); this.notificationListenerRunning = false; // Reject all pending requests for (const [, pending] of this.pendingRequests) { clearTimeout(pending.timeoutHandle); pending.reject(new Error('Client closed')); } this.pendingRequests.clear(); // Close sockets if (this.subSocket) { this.subSocket.close(); this.subSocket = undefined; } if (this.reqSocket) { this.reqSocket.close(); this.reqSocket = undefined; } this.connected = false; this.logger.info('ZMQ relay client closed'); } /** * Check if client is connected */ isConnected(): boolean { return this.connected; } /** * Get the client ID used for notifications */ getClientId(): string { return this.config.clientId; } }