#!/usr/bin/env node // Main ingestor worker process import { readFileSync } from 'fs'; import { parse as parseYaml } from 'yaml'; import pino from 'pino'; import { ZmqClient } from './zmq-client.js'; import { KafkaProducer } from './kafka-producer.js'; import { CCXTFetcher, ExchangeRateLimitError } from './ccxt-fetcher.js'; import { RealtimePoller } from './realtime-poller.js'; import { SymbolMetadataGenerator } from './symbol-metadata-generator.js'; import { SlotType } from './proto/messages.js'; // Logger setup const logger = pino({ level: process.env.LOG_LEVEL || 'info', transport: { target: 'pino-pretty', options: { colorize: true, translateTime: 'SYS:standard', ignore: 'pid,hostname' } } }); // Load configuration function loadConfig() { const configPath = process.env.CONFIG_PATH || '/config/config.yaml'; const secretsPath = process.env.SECRETS_PATH || '/config/secrets.yaml'; let config = {}; let secrets = {}; try { const configFile = readFileSync(configPath, 'utf8'); config = parseYaml(configFile); logger.info({ path: configPath }, 'Loaded configuration'); } catch (error) { logger.warn({ error: error.message }, 'Could not load config, using defaults'); } try { const secretsFile = readFileSync(secretsPath, 'utf8'); secrets = parseYaml(secretsFile); logger.info({ path: secretsPath }, 'Loaded secrets'); } catch (error) { logger.warn({ error: error.message }, 'Could not load secrets'); } return { // Flink ZMQ endpoints flink_hostname: config.flink_hostname || 'localhost', ingestor_broker_port: config.ingestor_broker_port || 5567, // Kafka configuration kafka_brokers: config.kafka_brokers || ['localhost:9092'], kafka_ohlc_topic: config.kafka_ohlc_topic || 'market-ohlc', kafka_tick_topic: config.kafka_tick_topic || 'market-tick', // Worker configuration poll_interval_ms: config.poll_interval_ms || 10000, // Symbol metadata configuration supported_exchanges: config.supported_exchanges || ['binance', 'coinbase', 'kraken'], symbol_metadata_interval_ms: config.symbol_metadata_interval_ms || 6 * 60 * 60 * 1000, // Per-exchange slot capacity exchange_capacity: config.exchange_capacity || { BINANCE: { historical_slots: 3, realtime_slots: 5 }, KRAKEN: { historical_slots: 2, realtime_slots: 3 }, COINBASE: { historical_slots: 2, realtime_slots: 4 } }, ...secrets }; } /** * Manages work slots per exchange per job type. * * Each slot corresponds to one WorkerReady message sent to Flink. Flink consumes * a slot when it dispatches a job. The slot is re-offered (via another WorkerReady) * once the job completes, subject to any rate-limit backoff dictated by the exchange. */ class SlotPool { constructor(exchangeCapacity, zmqClient, logger) { this.zmqClient = zmqClient; this.logger = logger; // Key: 'EXCHANGE|TYPE' (e.g. 'BINANCE|HISTORICAL') // Value: { max, active: Set, backoffUntil: ms timestamp } this.slots = new Map(); for (const [exchange, cap] of Object.entries(exchangeCapacity)) { const ex = exchange.toUpperCase(); this.slots.set(`${ex}|HISTORICAL`, { max: cap.historical_slots ?? 2, active: new Set(), backoffUntil: 0 }); this.slots.set(`${ex}|REALTIME`, { max: cap.realtime_slots ?? 3, active: new Set(), backoffUntil: 0 }); } // jobId → { exchange, type } for release tracking this.jobMap = new Map(); } /** * Register the onConnected callback so slot offers are sent on every * TCP (re)connect rather than once at startup. Handles both the initial * connection race (Flink ROUTER not yet ready) and Flink restarts. */ init() { this.zmqClient.onConnected = () => this._offerAllFreeSlots(); this.logger.info( { slots: [...this.slots.entries()].map(([k, v]) => `${k}:${v.max}`) }, 'Slot pool initialized — will offer slots on connect' ); } /** * Re-offer all currently-free slots. Called on every TCP (re)connect. * Sends (max - active) WorkerReady messages per exchange+type key. */ async _offerAllFreeSlots() { const summary = []; for (const [key, slot] of this.slots) { const [exchange, type] = key.split('|'); const freeCount = slot.max - slot.active.size; for (let i = 0; i < freeCount; i++) { await this.zmqClient.sendTypedReady(exchange, SlotType[type]); } summary.push(`${key}:${freeCount}/${slot.max}`); } this.logger.info({ offered: summary }, 'Re-offered all free slots on connect'); } /** * Record a slot as occupied by jobId. * @param {string} jobId * @param {string} exchange - e.g. 'BINANCE' * @param {string} type - 'HISTORICAL' | 'REALTIME' */ consumeSlot(jobId, exchange, type) { const key = `${exchange.toUpperCase()}|${type}`; const slot = this.slots.get(key); if (slot) { if (slot.active.size >= slot.max) { this.logger.warn({ jobId, key, active: slot.active.size, max: slot.max }, 'Slot capacity exceeded — rejecting job'); return false; } slot.active.add(jobId); this.jobMap.set(jobId, { exchange: exchange.toUpperCase(), type }); this.logger.debug({ jobId, key, active: slot.active.size, max: slot.max }, 'Slot consumed'); return true; } this.logger.warn({ jobId, key }, 'No slot config for this exchange+type'); return false; } /** * Release the slot occupied by jobId and re-offer it to Flink (after any backoff). */ async releaseSlot(jobId) { const info = this.jobMap.get(jobId); if (!info) { this.logger.warn({ jobId }, 'releaseSlot called for unknown jobId'); return; } this.jobMap.delete(jobId); const key = `${info.exchange}|${info.type}`; const slot = this.slots.get(key); if (slot) { slot.active.delete(jobId); await this._offerSlot(info.exchange, info.type, slot); } } /** * Record a rate limit from the exchange. Delays slot re-offer by retryAfterMs. * @param {string} exchange * @param {string} type - 'HISTORICAL' | 'REALTIME' * @param {number} retryAfterMs */ reportRateLimit(exchange, type, retryAfterMs) { const key = `${exchange.toUpperCase()}|${type}`; const slot = this.slots.get(key); if (slot) { slot.backoffUntil = Math.max(slot.backoffUntil, Date.now() + retryAfterMs); this.logger.warn({ exchange, type, retryAfterMs }, 'Rate limit backoff set for slot'); } } async _offerSlot(exchange, type, slot) { const now = Date.now(); if (now < slot.backoffUntil) { const delay = slot.backoffUntil - now; this.logger.info({ exchange, type, delayMs: delay }, 'Slot in backoff — scheduling re-offer'); setTimeout(() => this._offerSlot(exchange, type, slot), delay); return; } try { await this.zmqClient.sendTypedReady(exchange, SlotType[type]); this.logger.debug({ exchange, type }, 'Slot re-offered to Flink'); } catch (err) { this.logger.error({ exchange, type, error: err.message }, 'Failed to re-offer slot'); } } shutdown() {} } /** Extract exchange name from ticker string, e.g. "BTC/USDT.BINANCE" → "BINANCE" */ function exchangeOf(ticker) { const lastDot = ticker?.lastIndexOf('.'); return (lastDot >= 0) ? ticker.slice(lastDot + 1).toUpperCase() : 'UNKNOWN'; } class IngestorWorker { constructor(config, logger) { this.config = config; this.logger = logger; this.zmqClient = new ZmqClient(config, logger.child({ component: 'zmq' })); this.kafkaProducer = new KafkaProducer(config, logger.child({ component: 'kafka' })); this.metadataGenerator = new SymbolMetadataGenerator( config, this.kafkaProducer, logger.child({ component: 'metadata' }) ); this.ccxtFetcher = new CCXTFetcher( config, logger.child({ component: 'ccxt' }), this.metadataGenerator ); this.realtimePoller = new RealtimePoller( this.ccxtFetcher, this.kafkaProducer, this.zmqClient, logger.child({ component: 'poller' }) ); this.pool = new SlotPool( config.exchange_capacity, this.zmqClient, logger.child({ component: 'pool' }) ); // When realtime poller terminates a subscription due to repeated errors, release its slot. this.realtimePoller.onJobComplete = (jobId, error) => { if (error instanceof ExchangeRateLimitError) { this.pool.reportRateLimit(error.exchange, 'REALTIME', error.retryAfterMs); } this.pool.releaseSlot(jobId).catch(err => this.logger.error({ jobId, error: err.message }, 'Failed to release slot after realtime error')); }; // jobId set for active realtime subscriptions this.activeRealtime = new Set(); this.isShutdown = false; this.metadataInterval = null; } async start() { this.logger.info('Starting CCXT ingestor worker'); await this.kafkaProducer.connect(); // Wire event callbacks before connecting so we don't miss early messages this.zmqClient.onWorkAssign = req => this.handleWorkAssign(req); this.zmqClient.onWorkStop = jobId => this.handleWorkStop(jobId); // Register slot offer callback before connecting so we don't miss the event this.pool.init(); await this.zmqClient.connect(); // Generate symbol metadata on startup this.logger.info('Generating initial symbol metadata'); try { const results = await this.metadataGenerator.generateAll(); this.logger.info({ results }, 'Initial symbol metadata generated'); } catch (error) { this.logger.error({ error: error.message }, 'Failed to generate initial symbol metadata'); } // Schedule periodic metadata generation this.metadataInterval = setInterval(async () => { this.logger.info('Periodic symbol metadata generation'); try { const results = await this.metadataGenerator.generateAll(); this.logger.info({ results }, 'Periodic symbol metadata generated'); } catch (error) { this.logger.error({ error: error.message }, 'Failed to generate periodic symbol metadata'); } }, this.config.symbol_metadata_interval_ms); this.logger.info('Ingestor worker started successfully'); } /** * Handle a WorkAssign message dispatched by Flink IngestorBroker. * Called from the ZmqClient receive loop — do not block. */ handleWorkAssign(request) { const { jobId, requestId, type, ticker } = request; const exchange = exchangeOf(ticker); this.logger.info({ jobId, requestId, type, ticker, exchange }, 'Received WorkAssign'); const isHistorical = !type || type === 'HISTORICAL_OHLC' || type === 0; const isRealtime = type === 'REALTIME_TICKS' || type === 1; if (isHistorical) { if (!this.pool.consumeSlot(jobId, exchange, 'HISTORICAL')) { this.zmqClient.sendReject(jobId, 'Slot capacity exceeded').catch(() => {}); return; } this.handleHistoricalRequest(request).catch(err => { this.logger.error({ jobId, requestId, error: err.message }, 'Unexpected error in historical handler'); }); } else if (isRealtime) { if (!this.pool.consumeSlot(jobId, exchange, 'REALTIME')) { this.zmqClient.sendReject(jobId, 'Slot capacity exceeded').catch(() => {}); return; } this.handleRealtimeRequest(request); } else { this.logger.warn({ jobId, type }, 'Unknown request type — rejecting'); this.zmqClient.sendReject(jobId, `Unknown request type: ${type}`).catch(() => {}); } } /** * Handle WorkStop sent by Flink (e.g., all subscribers left). */ handleWorkStop(jobId) { this.logger.info({ jobId }, 'Received WorkStop — cancelling realtime subscription'); this.realtimePoller.cancelSubscription(jobId); this.activeRealtime.delete(jobId); this.pool.releaseSlot(jobId).catch(err => this.logger.warn({ jobId, error: err.message }, 'Failed to release slot after WorkStop')); // No WorkComplete needed — Flink sent the stop, it already knows. } /** * Fetch historical OHLC data and write to Kafka. * Sends WorkComplete when done (success or error). */ async handleHistoricalRequest(request) { const { jobId, requestId, ticker, historical, clientId: client_id } = request; const exchange = exchangeOf(ticker); const { startTime: start_time, endTime: end_time, periodSeconds: period_seconds, limit } = historical || {}; this.logger.info({ jobId, requestId, ticker, period_seconds }, 'Processing historical OHLC request'); // Immediately ack to reset Flink's dispatch-time timeout clock. await this.zmqClient.sendHeartbeat(jobId); try { const candles = await this.ccxtFetcher.fetchHistoricalOHLC( ticker, start_time, end_time, period_seconds, limit ); this.logger.info({ jobId, requestId, ticker, count: candles.length }, 'Fetched from exchange'); if (candles.length > 0) { const metadata = { request_id: requestId, client_id, ticker, period_seconds, start_time, end_time }; const PAGE_SIZE = 1000; for (let i = 0; i < candles.length; i += PAGE_SIZE) { const page = candles.slice(i, i + PAGE_SIZE); const isLastPage = (i + PAGE_SIZE) >= candles.length; await this.kafkaProducer.writeOHLCs(this.config.kafka_ohlc_topic, page, metadata, isLastPage); } this.logger.info( { jobId, requestId, ticker, count: candles.length, pages: Math.ceil(candles.length / PAGE_SIZE) }, 'Wrote all pages to Kafka' ); } else { await this.kafkaProducer.writeMarker(this.config.kafka_ohlc_topic, { request_id: requestId, client_id, ticker, period_seconds, start_time, end_time, status: 'NOT_FOUND', message: 'No data available for requested period' }); } this.logger.info({ jobId, requestId, ticker }, 'Historical request complete — sending WorkComplete'); await this.zmqClient.sendComplete(jobId, true); } catch (error) { this.logger.error({ jobId, requestId, ticker, error: error.message }, 'Historical request failed'); if (error instanceof ExchangeRateLimitError) { this.pool.reportRateLimit(exchange, 'HISTORICAL', error.retryAfterMs); } try { await this.kafkaProducer.writeMarker(this.config.kafka_ohlc_topic, { request_id: requestId, client_id, ticker, period_seconds, start_time, end_time, status: 'ERROR', error_message: error.message }); } catch (kafkaErr) { this.logger.error({ jobId, error: kafkaErr.message }, 'Failed to write error marker to Kafka'); } await this.zmqClient.sendComplete(jobId, false, error.message); } // Release slot regardless of success or failure this.pool.releaseSlot(jobId).catch(err => this.logger.error({ jobId, error: err.message }, 'Failed to release historical slot')); } /** * Start realtime tick polling for a job dispatched by Flink. */ handleRealtimeRequest(request) { const { jobId, requestId, ticker } = request; this.logger.info({ jobId, requestId, ticker }, 'Processing realtime subscription request'); this.activeRealtime.add(jobId); this.realtimePoller.startSubscription(jobId, requestId, ticker, this.config.kafka_tick_topic); } getStatus() { return { activeRealtime: this.activeRealtime.size, pollerStats: this.realtimePoller.getStats(), metadataStatus: this.metadataGenerator.getStatus() }; } async shutdown() { if (this.isShutdown) return; this.isShutdown = true; this.logger.info('Shutting down ingestor worker'); if (this.metadataInterval) clearInterval(this.metadataInterval); this.pool.shutdown(); this.realtimePoller.shutdown(); await this.ccxtFetcher.close(); await this.metadataGenerator.close(); await this.kafkaProducer.disconnect(); await this.zmqClient.shutdown(); this.logger.info('Ingestor worker shutdown complete'); process.exit(0); } } // Main entry point async function main() { const config = loadConfig(); const worker = new IngestorWorker(config, logger); process.on('SIGINT', () => worker.shutdown()); process.on('SIGTERM', () => worker.shutdown()); process.on('uncaughtException', error => { logger.error({ error }, 'Uncaught exception'); worker.shutdown(); }); process.on('unhandledRejection', reason => { logger.error({ reason }, 'Unhandled rejection'); }); await worker.start(); setInterval(() => { logger.info({ status: worker.getStatus() }, 'Worker status'); }, 60000); } main().catch(error => { logger.error({ error }, 'Fatal error'); process.exit(1); });