Files
ai/gateway/src/clients/zmq-relay-client.ts

445 lines
14 KiB
TypeScript

/**
* ZMQ Relay Client for historical data requests
*
* IMPORTANT: Implements race-condition-free notification subscription
* by subscribing to RESPONSE:{client_id} topic BEFORE sending requests.
*
* Architecture:
* - REQ socket to relay (port 5559) for SubmitHistoricalRequest
* - SUB socket to relay (port 5558) for HistoryReadyNotification
* - Notification topic: RESPONSE:{client_id} (deterministic, client-generated)
*/
import * as zmq from 'zeromq';
import type { FastifyBaseLogger } from 'fastify';
import { randomUUID } from 'crypto';
import {
encodeSubmitHistoricalRequest,
decodeSubmitResponse,
decodeHistoryReadyNotification,
decodeRealtimeBar,
OHLC_BAR_TOPIC_PATTERN,
type RealtimeBar,
} from './zmq-protocol.js';
import type {
SubmitHistoricalRequest,
HistoryReadyNotification,
} from '../types/ohlc.js';
import {
SubmitStatus,
NotificationStatus,
} from '../types/ohlc.js';
export type BarUpdateCallback = (bar: RealtimeBar) => void;
export type { RealtimeBar };
export interface ZMQRelayConfig {
relayRequestEndpoint: string; // e.g., "tcp://relay:5559"
relayNotificationEndpoint: string; // e.g., "tcp://relay:5558"
clientId?: string; // Optional client ID, will generate if not provided
requestTimeout?: number; // Request timeout in ms (default: 120000)
onMetadataUpdate?: () => Promise<void>; // Callback when symbol metadata updates
}
interface PendingRequest {
resolve: (notification: HistoryReadyNotification) => void;
reject: (error: Error) => void;
timeoutHandle: NodeJS.Timeout;
}
/**
* ZMQ Relay Client
*
* Provides async API for submitting historical data requests and waiting for
* completion notifications.
*/
export class ZMQRelayClient {
private config: Required<ZMQRelayConfig>;
private logger: FastifyBaseLogger;
private reqSocket?: zmq.Request;
private subSocket?: zmq.Subscriber;
private notificationTopic: string;
private pendingRequests: Map<string, PendingRequest> = new Map();
/** Ref count per ZMQ topic (gateway-level dedup before ZMQ subscribe/unsubscribe) */
private topicRefs: Map<string, number> = new Map();
/** Callbacks registered by WebSocket sessions for realtime bar updates */
private barCallbacks: Map<string, Set<BarUpdateCallback>> = new Map();
private connected = false;
private notificationListenerRunning = false;
constructor(config: ZMQRelayConfig, logger: FastifyBaseLogger) {
this.config = {
relayRequestEndpoint: config.relayRequestEndpoint,
relayNotificationEndpoint: config.relayNotificationEndpoint,
clientId: config.clientId || `gateway-${randomUUID().slice(0, 8)}`,
requestTimeout: config.requestTimeout || 120000,
onMetadataUpdate: config.onMetadataUpdate || (async () => {}),
};
this.logger = logger;
this.notificationTopic = `RESPONSE:${this.config.clientId}`;
}
/**
* Connect to relay and start notification listener
*
* CRITICAL: This MUST be called before making any requests.
* The notification listener subscribes to RESPONSE:{client_id} topic
* BEFORE any requests are sent, preventing race conditions.
*/
async connect(): Promise<void> {
if (this.connected) {
return;
}
this.logger.info({
requestEndpoint: this.config.relayRequestEndpoint,
notificationEndpoint: this.config.relayNotificationEndpoint,
clientId: this.config.clientId,
notificationTopic: this.notificationTopic,
}, 'Connecting to ZMQ relay');
// Create REQ socket for requests
this.reqSocket = new zmq.Request();
this.reqSocket.connect(this.config.relayRequestEndpoint);
// Create SUB socket for notifications
this.subSocket = new zmq.Subscriber();
this.subSocket.connect(this.config.relayNotificationEndpoint);
// Subscribe to our notification topic BEFORE sending any requests
this.subSocket.subscribe(this.notificationTopic);
// Subscribe to system metadata update notifications
this.subSocket.subscribe('METADATA_UPDATE');
this.logger.info({
topics: [this.notificationTopic, 'METADATA_UPDATE']
}, 'Subscribed to notification topics');
// Start notification listener
this.startNotificationListener();
// Give sockets a moment to connect
await new Promise(resolve => setTimeout(resolve, 100));
this.connected = true;
this.logger.info('ZMQ relay client connected');
}
/**
* Request historical OHLC data
*
* IMPORTANT: Call connect() before using this method.
*
* @param ticker Market identifier (e.g., "BTC/USDT.BINANCE")
* @param period_seconds OHLC period in seconds
* @param start_time Start timestamp in MICROSECONDS
* @param end_time End timestamp in MICROSECONDS
* @param limit Optional limit on number of candles
* @returns Promise that resolves when data is ready in Iceberg
*/
async requestHistoricalOHLC(
ticker: string,
period_seconds: number,
start_time: bigint,
end_time: bigint,
limit?: number
): Promise<HistoryReadyNotification> {
if (!this.connected || !this.reqSocket) {
throw new Error('Client not connected. Call connect() first.');
}
const request_id = randomUUID();
this.logger.debug({
request_id,
ticker,
period_seconds,
start_time: start_time.toString(),
end_time: end_time.toString(),
}, 'Submitting historical OHLC request');
const request: SubmitHistoricalRequest = {
request_id,
ticker,
start_time,
end_time,
period_seconds,
limit,
client_id: this.config.clientId,
};
// Register pending request BEFORE sending (notification listener is already running)
const resultPromise = new Promise<HistoryReadyNotification>((resolve, reject) => {
const timeoutHandle = setTimeout(() => {
this.pendingRequests.delete(request_id);
reject(new Error(`Request ${request_id} timed out after ${this.config.requestTimeout}ms`));
}, this.config.requestTimeout);
this.pendingRequests.set(request_id, { resolve, reject, timeoutHandle });
});
// Encode and send request
const frames = encodeSubmitHistoricalRequest(request);
try {
// Send two frames: version, then message
await this.reqSocket.send(frames);
// Wait for immediate acknowledgment
const responseFrames = await this.reqSocket.receive();
this.logger.debug({
frameCount: responseFrames.length,
frameLengths: Array.from(responseFrames).map(f => f.length),
}, 'Received response frames from relay');
const response = decodeSubmitResponse(Array.from(responseFrames));
this.logger.debug({
request_id,
response,
}, 'Decoded SubmitResponse');
if (response.status !== SubmitStatus.QUEUED) {
// Request was rejected - clean up pending request
const pending = this.pendingRequests.get(request_id);
if (pending) {
clearTimeout(pending.timeoutHandle);
this.pendingRequests.delete(request_id);
}
throw new Error(`Request rejected: ${response.error_message || 'Unknown error'}`);
}
this.logger.debug({ request_id }, 'Request queued, waiting for notification');
// Wait for notification (already subscribed to topic)
return await resultPromise;
} catch (error) {
// Clean up pending request on error
const pending = this.pendingRequests.get(request_id);
if (pending) {
clearTimeout(pending.timeoutHandle);
this.pendingRequests.delete(request_id);
}
this.logger.error({
error,
request_id,
ticker,
errorMessage: error instanceof Error ? error.message : String(error),
errorStack: error instanceof Error ? error.stack : undefined,
}, 'Failed to submit historical OHLC request');
throw error;
}
}
/**
* Start notification listener
*
* CRITICAL: This runs BEFORE any requests are submitted to prevent race condition.
* We're already subscribed to RESPONSE:{client_id} and METADATA_UPDATE, so we'll receive all notifications.
*/
private startNotificationListener(): void {
if (this.notificationListenerRunning || !this.subSocket) {
return;
}
this.notificationListenerRunning = true;
// Listen for notifications asynchronously
(async () => {
try {
for await (const frames of this.subSocket!) {
try {
// First frame is the topic
const topic = frames[0].toString();
// Handle metadata update notifications
if (topic === 'METADATA_UPDATE') {
this.logger.info('Received METADATA_UPDATE notification');
if (this.config.onMetadataUpdate) {
try {
await this.config.onMetadataUpdate();
} catch (error) {
this.logger.error({ error }, 'Failed to handle metadata update');
}
}
continue;
}
// Handle realtime OHLC bar updates (topic pattern: "{ticker}|ohlc:{period}")
if (OHLC_BAR_TOPIC_PATTERN.test(topic)) {
const bar = decodeRealtimeBar(Array.from(frames));
if (bar) {
const callbacks = this.barCallbacks.get(topic);
if (callbacks) {
for (const cb of callbacks) {
try { cb(bar); } catch (e) { /* ignore callback errors */ }
}
}
}
continue;
}
// Handle history ready notifications
const notification = decodeHistoryReadyNotification(Array.from(frames));
this.logger.debug({
request_id: notification.request_id,
status: NotificationStatus[notification.status],
row_count: notification.row_count,
}, 'Received history ready notification');
// Check if we're waiting for this request
const pending = this.pendingRequests.get(notification.request_id);
if (pending) {
clearTimeout(pending.timeoutHandle);
this.pendingRequests.delete(notification.request_id);
if (notification.status === NotificationStatus.OK) {
pending.resolve(notification);
} else {
pending.reject(new Error(
`Historical data request failed: ${notification.error_message || NotificationStatus[notification.status]}`
));
}
} else {
this.logger.warn({
request_id: notification.request_id,
}, 'Received notification for unknown request');
}
} catch (error) {
this.logger.error({ error }, 'Failed to process notification');
}
}
} catch (error) {
if (this.notificationListenerRunning) {
this.logger.error({ error }, 'Notification listener error');
}
} finally {
this.notificationListenerRunning = false;
}
})();
this.logger.debug('Notification listener started');
}
/**
* Subscribe to realtime OHLC bars for a ticker+period.
*
* ZMQ subscribe is only called on the 0→1 transition (first subscriber).
* This triggers the relay XPUB → Flink subscription detection → ingestor activation.
*
* @param callback Called whenever a new bar arrives for this topic
*/
subscribeToTicker(ticker: string, periodSeconds: number, callback: BarUpdateCallback): void {
const topic = `${ticker}|ohlc:${periodSeconds}`;
// Register callback
if (!this.barCallbacks.has(topic)) {
this.barCallbacks.set(topic, new Set());
}
this.barCallbacks.get(topic)!.add(callback);
// ZMQ subscribe on first ref
const prev = this.topicRefs.get(topic) ?? 0;
this.topicRefs.set(topic, prev + 1);
if (prev === 0 && this.subSocket) {
this.subSocket.subscribe(topic);
this.logger.info({ topic }, 'ZMQ subscribed to realtime topic');
}
}
/**
* Unsubscribe a callback from realtime OHLC bars.
* ZMQ unsubscribe is only called on the 1→0 transition (last subscriber).
*/
unsubscribeFromTicker(ticker: string, periodSeconds: number, callback: BarUpdateCallback): void {
const topic = `${ticker}|ohlc:${periodSeconds}`;
const callbacks = this.barCallbacks.get(topic);
if (callbacks) {
callbacks.delete(callback);
if (callbacks.size === 0) {
this.barCallbacks.delete(topic);
}
}
const prev = this.topicRefs.get(topic) ?? 0;
if (prev <= 1) {
this.topicRefs.delete(topic);
if (this.subSocket) {
this.subSocket.unsubscribe(topic);
this.logger.info({ topic }, 'ZMQ unsubscribed from realtime topic');
}
} else {
this.topicRefs.set(topic, prev - 1);
}
}
/**
* Remove all subscriptions for a set of (topic, callback) pairs.
* Convenience method for WebSocket disconnect cleanup.
*/
cleanupSubscriptions(subscriptions: Array<{ ticker: string; periodSeconds: number; callback: BarUpdateCallback }>): void {
for (const { ticker, periodSeconds, callback } of subscriptions) {
this.unsubscribeFromTicker(ticker, periodSeconds, callback);
}
}
/**
* Close the client and cleanup resources
*/
async close(): Promise<void> {
if (!this.connected) {
return;
}
this.logger.info('Closing ZMQ relay client');
this.notificationListenerRunning = false;
// Reject all pending requests
for (const [, pending] of this.pendingRequests) {
clearTimeout(pending.timeoutHandle);
pending.reject(new Error('Client closed'));
}
this.pendingRequests.clear();
// Close sockets
if (this.subSocket) {
this.subSocket.close();
this.subSocket = undefined;
}
if (this.reqSocket) {
this.reqSocket.close();
this.reqSocket = undefined;
}
this.connected = false;
this.logger.info('ZMQ relay client closed');
}
/**
* Check if client is connected
*/
isConnected(): boolean {
return this.connected;
}
/**
* Get the client ID used for notifications
*/
getClientId(): string {
return this.config.clientId;
}
}