// Kafka producer for writing market data import { Kafka } from 'kafkajs'; import { encodeMessage, MessageTypeId, Tick, OHLC, OHLCBatch, Market } from './proto/messages.js'; export class KafkaProducer { constructor(config, logger) { this.config = config; this.logger = logger; this.kafka = new Kafka({ clientId: 'ccxt-ingestor', brokers: config.kafka_brokers || ['localhost:9092'], logLevel: 0 // Error only }); this.producer = this.kafka.producer(); this.isConnected = false; } /** * Connect to Kafka */ async connect() { await this.producer.connect(); this.isConnected = true; this.logger.info('Connected to Kafka'); } /** * Write a tick message to Kafka * @param {string} topic - Kafka topic name * @param {object} tickData - Tick data object */ async writeTick(topic, tickData) { if (!this.isConnected) { throw new Error('Kafka producer not connected'); } const [frame1, frame2] = encodeMessage(MessageTypeId.TICK, tickData, Tick); const message = Buffer.concat([frame1, frame2]); await this.producer.send({ topic, messages: [ { key: tickData.ticker, value: message, timestamp: tickData.timestamp.toString() } ] }); this.logger.debug({ ticker: tickData.ticker, topic }, 'Wrote tick to Kafka'); } /** * Write multiple ticks to Kafka in batch * @param {string} topic - Kafka topic name * @param {Array} ticksData - Array of tick data objects */ async writeTicks(topic, ticksData) { if (!this.isConnected) { throw new Error('Kafka producer not connected'); } if (ticksData.length === 0) { return; } const messages = ticksData.map(tickData => { const [frame1, frame2] = encodeMessage(MessageTypeId.TICK, tickData, Tick); const message = Buffer.concat([frame1, frame2]); return { key: tickData.ticker, value: message, timestamp: tickData.timestamp.toString() }; }); await this.producer.send({ topic, messages }); this.logger.debug( { count: ticksData.length, topic }, 'Wrote ticks batch to Kafka' ); } /** * Write an OHLC message to Kafka * @param {string} topic - Kafka topic name * @param {object} ohlcData - OHLC data object */ async writeOHLC(topic, ohlcData) { if (!this.isConnected) { throw new Error('Kafka producer not connected'); } const [frame1, frame2] = encodeMessage(MessageTypeId.OHLC, ohlcData, OHLC); const message = Buffer.concat([frame1, frame2]); await this.producer.send({ topic, messages: [ { key: ohlcData.ticker, value: message } ] }); this.logger.debug({ ticker: ohlcData.ticker, topic }, 'Wrote OHLC to Kafka'); } /** * Write multiple OHLC candles to Kafka as an OHLCBatch message * Uses protobuf encoding with metadata in batch wrapper * @param {string} topic - Kafka topic name * @param {Array} ohlcData - Array of OHLC data objects (may include __metadata in first record) */ async writeOHLCs(topic, ohlcData) { if (!this.isConnected) { throw new Error('Kafka producer not connected'); } if (ohlcData.length === 0) { return; } // Extract metadata from first record if present const firstCandle = ohlcData[0]; const metadata = firstCandle.__metadata; if (!metadata) { // No metadata - write individual OHLC messages (realtime mode) const messages = ohlcData.map(candle => { const protoCandle = { timestamp: candle.timestamp, ticker: candle.ticker, open: candle.open, high: candle.high, low: candle.low, close: candle.close, volume: candle.volume }; const [frame1, frame2] = encodeMessage(MessageTypeId.OHLC, protoCandle, OHLC); const value = Buffer.concat([frame1, frame2]); return { key: candle.ticker, value }; }); await this.producer.send({ topic, messages }); this.logger.debug( { count: ohlcData.length, topic, type: 'individual' }, 'Wrote OHLC messages to Kafka' ); return; } // Historical mode - write as OHLCBatch with metadata const batch = { metadata: { requestId: metadata.request_id, clientId: metadata.client_id, ticker: metadata.ticker, periodSeconds: metadata.period_seconds, startTime: metadata.start_time, endTime: metadata.end_time, status: metadata.status || 'OK', errorMessage: metadata.error_message }, rows: ohlcData.map(candle => { // null open/high/low/close signals a gap bar (no trades that period). // Omit fields from the protobuf message when null so hasOpen() etc. return false. const row = { timestamp: candle.timestamp, ticker: candle.ticker, }; if (candle.open != null) row.open = candle.open; if (candle.high != null) row.high = candle.high; if (candle.low != null) row.low = candle.low; if (candle.close != null) row.close = candle.close; if (candle.volume != null) row.volume = candle.volume; return row; }) }; // Encode as protobuf OHLCBatch with ZMQ envelope const [frame1, frame2] = encodeMessage(MessageTypeId.OHLC_BATCH, batch, OHLCBatch); const value = Buffer.concat([frame1, frame2]); await this.producer.send({ topic, messages: [ { key: metadata.ticker, value } ] }); this.logger.debug( { request_id: metadata.request_id, count: ohlcData.length, topic, type: 'batch' }, 'Wrote OHLCBatch to Kafka' ); } /** * Write a marker message to Kafka for NOT_FOUND or ERROR cases * This allows Flink to publish notifications even when no data is available * @param {string} topic - Kafka topic name * @param {object} marker - Marker object with request metadata and status */ async writeMarker(topic, marker) { if (!this.isConnected) { throw new Error('Kafka producer not connected'); } // Create an empty OHLCBatch with status in metadata const batch = { metadata: { requestId: marker.request_id, clientId: marker.client_id, ticker: marker.ticker, periodSeconds: marker.period_seconds, startTime: marker.start_time, endTime: marker.end_time, status: marker.status, // 'NOT_FOUND' or 'ERROR' errorMessage: marker.error_message || marker.message }, rows: [] // Empty rows array indicates marker message }; // Encode as protobuf OHLCBatch with ZMQ envelope const [frame1, frame2] = encodeMessage(MessageTypeId.OHLC_BATCH, batch, OHLCBatch); const value = Buffer.concat([frame1, frame2]); await this.producer.send({ topic, messages: [ { key: marker.ticker, value } ] }); this.logger.info( { request_id: marker.request_id, status: marker.status, topic }, 'Wrote marker to Kafka' ); } /** * Write market metadata messages to Kafka * @param {string} topic - Kafka topic name * @param {Array} messages - Array of {key, value} objects where value is Market metadata */ async writeMarketMetadata(topic, messages) { if (!this.isConnected) { throw new Error('Kafka producer not connected'); } if (messages.length === 0) { return; } const kafkaMessages = messages.map(({ key, value }) => { const [frame1, frame2] = encodeMessage(MessageTypeId.MARKET, value, Market); const encodedValue = Buffer.concat([frame1, frame2]); return { key, value: encodedValue }; }); await this.producer.send({ topic, messages: kafkaMessages }); this.logger.debug( { count: messages.length, topic }, 'Wrote market metadata to Kafka' ); } /** * Disconnect from Kafka */ async disconnect() { if (this.isConnected) { await this.producer.disconnect(); this.isConnected = false; this.logger.info('Disconnected from Kafka'); } } }