Files
ai/ingestor/src/index.js

534 lines
19 KiB
JavaScript

#!/usr/bin/env node
// Main ingestor worker process
import { readFileSync } from 'fs';
import { parse as parseYaml } from 'yaml';
import pino from 'pino';
import { ZmqClient } from './zmq-client.js';
import { KafkaProducer } from './kafka-producer.js';
import { CCXTFetcher, ExchangeRateLimitError } from './ccxt-fetcher.js';
import { RealtimePoller } from './realtime-poller.js';
import { SymbolMetadataGenerator } from './symbol-metadata-generator.js';
import { SlotType } from './proto/messages.js';
// Logger setup
const logger = pino({
level: process.env.LOG_LEVEL || 'info',
transport: {
target: 'pino-pretty',
options: {
colorize: true,
translateTime: 'SYS:standard',
ignore: 'pid,hostname'
}
}
});
// Load configuration
function loadConfig() {
const configPath = process.env.CONFIG_PATH || '/config/config.yaml';
const secretsPath = process.env.SECRETS_PATH || '/config/secrets.yaml';
let config = {};
let secrets = {};
try {
const configFile = readFileSync(configPath, 'utf8');
config = parseYaml(configFile);
logger.info({ path: configPath }, 'Loaded configuration');
} catch (error) {
logger.warn({ error: error.message }, 'Could not load config, using defaults');
}
try {
const secretsFile = readFileSync(secretsPath, 'utf8');
secrets = parseYaml(secretsFile);
logger.info({ path: secretsPath }, 'Loaded secrets');
} catch (error) {
logger.warn({ error: error.message }, 'Could not load secrets');
}
return {
// Flink ZMQ endpoints
flink_hostname: config.flink_hostname || 'localhost',
ingestor_broker_port: config.ingestor_broker_port || 5567,
// Kafka configuration
kafka_brokers: config.kafka_brokers || ['localhost:9092'],
kafka_ohlc_topic: config.kafka_ohlc_topic || 'market-ohlc',
kafka_tick_topic: config.kafka_tick_topic || 'market-tick',
kafka_ticker_topic: config.kafka_ticker_topic || 'market-ticker',
// Worker configuration
poll_interval_ms: config.poll_interval_ms || 10000,
// Symbol metadata configuration
supported_exchanges: config.supported_exchanges || ['binance', 'coinbase', 'kraken'],
symbol_metadata_interval_ms: config.symbol_metadata_interval_ms || 6 * 60 * 60 * 1000,
// Per-exchange slot capacity
exchange_capacity: config.exchange_capacity || {
BINANCE: { historical_slots: 3, realtime_slots: 5 },
KRAKEN: { historical_slots: 2, realtime_slots: 3 },
COINBASE: { historical_slots: 2, realtime_slots: 4 }
},
...secrets
};
}
/**
* Manages work slots per exchange per job type.
*
* Each slot corresponds to one WorkerReady message sent to Flink. Flink consumes
* a slot when it dispatches a job. The slot is re-offered (via another WorkerReady)
* once the job completes, subject to any rate-limit backoff dictated by the exchange.
*/
class SlotPool {
constructor(exchangeCapacity, zmqClient, logger) {
this.zmqClient = zmqClient;
this.logger = logger;
// Key: 'EXCHANGE|TYPE' (e.g. 'BINANCE|HISTORICAL')
// Value: { max, active: Set<jobId>, backoffUntil: ms timestamp }
this.slots = new Map();
for (const [exchange, cap] of Object.entries(exchangeCapacity)) {
const ex = exchange.toUpperCase();
this.slots.set(`${ex}|HISTORICAL`, {
max: cap.historical_slots ?? 2,
active: new Set(),
backoffUntil: 0
});
this.slots.set(`${ex}|REALTIME`, {
max: cap.realtime_slots ?? 3,
active: new Set(),
backoffUntil: 0
});
}
// jobId → { exchange, type } for release tracking
this.jobMap = new Map();
}
/**
* Register the onConnected callback so slot offers are sent on every
* TCP (re)connect rather than once at startup. Handles both the initial
* connection race (Flink ROUTER not yet ready) and Flink restarts.
*/
init() {
this.zmqClient.onConnected = () => this._offerAllFreeSlots();
this.logger.info(
{ slots: [...this.slots.entries()].map(([k, v]) => `${k}:${v.max}`) },
'Slot pool initialized — will offer slots on connect'
);
}
/**
* Re-offer all currently-free slots. Called on every TCP (re)connect.
* Sends (max - active) WorkerReady messages per exchange+type key.
*/
async _offerAllFreeSlots() {
const summary = [];
for (const [key, slot] of this.slots) {
const [exchange, type] = key.split('|');
const freeCount = slot.max - slot.active.size;
for (let i = 0; i < freeCount; i++) {
await this.zmqClient.sendTypedReady(exchange, SlotType[type]);
}
summary.push(`${key}:${freeCount}/${slot.max}`);
}
this.logger.info({ offered: summary }, 'Re-offered all free slots on connect');
}
/**
* Record a slot as occupied by jobId.
* @param {string} jobId
* @param {string} exchange - e.g. 'BINANCE'
* @param {string} type - 'HISTORICAL' | 'REALTIME'
*/
consumeSlot(jobId, exchange, type) {
const key = `${exchange.toUpperCase()}|${type}`;
const slot = this.slots.get(key);
if (slot) {
if (slot.active.size >= slot.max) {
this.logger.warn({ jobId, key, active: slot.active.size, max: slot.max }, 'Slot capacity exceeded — rejecting job');
return false;
}
slot.active.add(jobId);
this.jobMap.set(jobId, { exchange: exchange.toUpperCase(), type });
this.logger.debug({ jobId, key, active: slot.active.size, max: slot.max }, 'Slot consumed');
return true;
}
this.logger.warn({ jobId, key }, 'No slot config for this exchange+type');
return false;
}
/**
* Release the slot occupied by jobId and re-offer it to Flink (after any backoff).
*/
async releaseSlot(jobId) {
const info = this.jobMap.get(jobId);
if (!info) {
this.logger.warn({ jobId }, 'releaseSlot called for unknown jobId');
return;
}
this.jobMap.delete(jobId);
const key = `${info.exchange}|${info.type}`;
const slot = this.slots.get(key);
if (slot) {
slot.active.delete(jobId);
await this._offerSlot(info.exchange, info.type, slot);
}
}
/**
* Record a rate limit from the exchange. Delays slot re-offer by retryAfterMs.
* @param {string} exchange
* @param {string} type - 'HISTORICAL' | 'REALTIME'
* @param {number} retryAfterMs
*/
reportRateLimit(exchange, type, retryAfterMs) {
const key = `${exchange.toUpperCase()}|${type}`;
const slot = this.slots.get(key);
if (slot) {
slot.backoffUntil = Math.max(slot.backoffUntil, Date.now() + retryAfterMs);
this.logger.warn({ exchange, type, retryAfterMs }, 'Rate limit backoff set for slot');
}
}
async _offerSlot(exchange, type, slot) {
const now = Date.now();
if (now < slot.backoffUntil) {
const delay = slot.backoffUntil - now;
this.logger.info({ exchange, type, delayMs: delay }, 'Slot in backoff — scheduling re-offer');
setTimeout(() => this._offerSlot(exchange, type, slot), delay);
return;
}
try {
await this.zmqClient.sendTypedReady(exchange, SlotType[type]);
this.logger.debug({ exchange, type }, 'Slot re-offered to Flink');
} catch (err) {
this.logger.error({ exchange, type, error: err.message }, 'Failed to re-offer slot');
}
}
shutdown() {}
}
/** Extract exchange name from ticker string, e.g. "BTC/USDT.BINANCE" → "BINANCE" */
function exchangeOf(ticker) {
const lastDot = ticker?.lastIndexOf('.');
return (lastDot >= 0) ? ticker.slice(lastDot + 1).toUpperCase() : 'UNKNOWN';
}
class IngestorWorker {
constructor(config, logger) {
this.config = config;
this.logger = logger;
this.zmqClient = new ZmqClient(config, logger.child({ component: 'zmq' }));
this.kafkaProducer = new KafkaProducer(config, logger.child({ component: 'kafka' }));
this.metadataGenerator = new SymbolMetadataGenerator(
config,
this.kafkaProducer,
logger.child({ component: 'metadata' })
);
this.ccxtFetcher = new CCXTFetcher(
config,
logger.child({ component: 'ccxt' }),
this.metadataGenerator
);
this.realtimePoller = new RealtimePoller(
this.ccxtFetcher,
this.kafkaProducer,
this.zmqClient,
logger.child({ component: 'poller' })
);
this.pool = new SlotPool(
config.exchange_capacity,
this.zmqClient,
logger.child({ component: 'pool' })
);
// When realtime poller terminates a subscription due to repeated errors, release its slot.
this.realtimePoller.onJobComplete = (jobId, error) => {
if (error instanceof ExchangeRateLimitError) {
this.pool.reportRateLimit(error.exchange, 'REALTIME', error.retryAfterMs);
}
this.pool.releaseSlot(jobId).catch(err =>
this.logger.error({ jobId, error: err.message }, 'Failed to release slot after realtime error'));
};
// jobId set for active realtime subscriptions
this.activeRealtime = new Set();
this.isShutdown = false;
this.metadataInterval = null;
}
async start() {
this.logger.info('Starting CCXT ingestor worker');
await this.kafkaProducer.connect();
// Wire event callbacks before connecting so we don't miss early messages
this.zmqClient.onWorkAssign = req => this.handleWorkAssign(req);
this.zmqClient.onWorkStop = jobId => this.handleWorkStop(jobId);
// Register slot offer callback before connecting so we don't miss the event
this.pool.init();
await this.zmqClient.connect();
// Generate symbol metadata on startup
this.logger.info('Generating initial symbol metadata');
try {
const results = await this.metadataGenerator.generateAll();
this.logger.info({ results }, 'Initial symbol metadata generated');
} catch (error) {
this.logger.error({ error: error.message }, 'Failed to generate initial symbol metadata');
}
// Schedule periodic metadata generation
this.metadataInterval = setInterval(async () => {
this.logger.info('Periodic symbol metadata generation');
try {
const results = await this.metadataGenerator.generateAll();
this.logger.info({ results }, 'Periodic symbol metadata generated');
} catch (error) {
this.logger.error({ error: error.message }, 'Failed to generate periodic symbol metadata');
}
}, this.config.symbol_metadata_interval_ms);
this.logger.info('Ingestor worker started successfully');
}
/**
* Handle a WorkAssign message dispatched by Flink IngestorBroker.
* Called from the ZmqClient receive loop — do not block.
*/
handleWorkAssign(request) {
const { jobId, requestId, type, ticker } = request;
const exchange = exchangeOf(ticker);
this.logger.info({ jobId, requestId, type, ticker, exchange }, 'Received WorkAssign');
const isHistorical = !type || type === 'HISTORICAL_OHLC' || type === 0;
const isRealtime = type === 'REALTIME_TICKS' || type === 1;
const isTickerSnapshot = type === 'TICKER_SNAPSHOT' || type === 2;
if (isHistorical) {
if (!this.pool.consumeSlot(jobId, exchange, 'HISTORICAL')) {
this.zmqClient.sendReject(jobId, 'Slot capacity exceeded').catch(() => {});
return;
}
this.handleHistoricalRequest(request).catch(err => {
this.logger.error({ jobId, requestId, error: err.message }, 'Unexpected error in historical handler');
});
} else if (isRealtime) {
if (!this.pool.consumeSlot(jobId, exchange, 'REALTIME')) {
this.zmqClient.sendReject(jobId, 'Slot capacity exceeded').catch(() => {});
return;
}
this.handleRealtimeRequest(request);
} else if (isTickerSnapshot) {
if (!this.pool.consumeSlot(jobId, exchange, 'HISTORICAL')) {
this.zmqClient.sendReject(jobId, 'Slot capacity exceeded').catch(() => {});
return;
}
this.handleTicker24hRequest(request).catch(err => {
this.logger.error({ jobId, requestId, error: err.message }, 'Unexpected error in ticker24h handler');
});
} else {
this.logger.warn({ jobId, type }, 'Unknown request type — rejecting');
this.zmqClient.sendReject(jobId, `Unknown request type: ${type}`).catch(() => {});
}
}
/**
* Handle WorkStop sent by Flink (e.g., all subscribers left).
*/
handleWorkStop(jobId) {
this.logger.info({ jobId }, 'Received WorkStop — cancelling realtime subscription');
this.realtimePoller.cancelSubscription(jobId);
this.activeRealtime.delete(jobId);
this.pool.releaseSlot(jobId).catch(err =>
this.logger.warn({ jobId, error: err.message }, 'Failed to release slot after WorkStop'));
// No WorkComplete needed — Flink sent the stop, it already knows.
}
/**
* Fetch historical OHLC data and write to Kafka.
* Sends WorkComplete when done (success or error).
*/
async handleHistoricalRequest(request) {
const { jobId, requestId, ticker, historical, clientId: client_id } = request;
const exchange = exchangeOf(ticker);
const { startTime: start_time, endTime: end_time, periodSeconds: period_seconds, limit } = historical || {};
this.logger.info({ jobId, requestId, ticker, period_seconds }, 'Processing historical OHLC request');
// Immediately ack to reset Flink's dispatch-time timeout clock.
await this.zmqClient.sendHeartbeat(jobId);
try {
const candles = await this.ccxtFetcher.fetchHistoricalOHLC(
ticker, start_time, end_time, period_seconds, limit
);
this.logger.info({ jobId, requestId, ticker, count: candles.length }, 'Fetched from exchange');
if (candles.length > 0) {
const metadata = { request_id: requestId, client_id, ticker, period_seconds, start_time, end_time };
// 8000 rows/page: each OHLC row is ~77 bytes typical (9 populated fields as
// protobuf varints + ticker string). Worst-case is ~124 bytes, so 8000 rows
// stays safely under Kafka's 1MB message limit in all realistic scenarios.
const PAGE_SIZE = 8000;
for (let i = 0; i < candles.length; i += PAGE_SIZE) {
const page = candles.slice(i, i + PAGE_SIZE);
const isLastPage = (i + PAGE_SIZE) >= candles.length;
await this.kafkaProducer.writeOHLCs(this.config.kafka_ohlc_topic, page, metadata, isLastPage);
}
this.logger.info(
{ jobId, requestId, ticker, count: candles.length, pages: Math.ceil(candles.length / PAGE_SIZE) },
'Wrote all pages to Kafka'
);
} else {
await this.kafkaProducer.writeMarker(this.config.kafka_ohlc_topic, {
request_id: requestId, client_id, ticker, period_seconds, start_time, end_time,
status: 'NOT_FOUND', message: 'No data available for requested period'
});
}
this.logger.info({ jobId, requestId, ticker }, 'Historical request complete — sending WorkComplete');
await this.zmqClient.sendComplete(jobId, true);
} catch (error) {
this.logger.error({ jobId, requestId, ticker, error: error.message }, 'Historical request failed');
if (error instanceof ExchangeRateLimitError) {
this.pool.reportRateLimit(exchange, 'HISTORICAL', error.retryAfterMs);
}
try {
await this.kafkaProducer.writeMarker(this.config.kafka_ohlc_topic, {
request_id: requestId, client_id, ticker, period_seconds, start_time, end_time,
status: 'ERROR', error_message: error.message
});
} catch (kafkaErr) {
this.logger.error({ jobId, error: kafkaErr.message }, 'Failed to write error marker to Kafka');
}
await this.zmqClient.sendComplete(jobId, false, error.message);
}
// Release slot regardless of success or failure
this.pool.releaseSlot(jobId).catch(err =>
this.logger.error({ jobId, error: err.message }, 'Failed to release historical slot'));
}
/**
* Start realtime tick polling for a job dispatched by Flink.
*/
handleRealtimeRequest(request) {
const { jobId, requestId, ticker } = request;
this.logger.info({ jobId, requestId, ticker }, 'Processing realtime subscription request');
this.activeRealtime.add(jobId);
this.realtimePoller.startSubscription(jobId, requestId, ticker, this.config.kafka_tick_topic);
}
/**
* Fetch all tickers (24h stats) for an exchange and write to Kafka.
* Triggered by TICKER_SNAPSHOT request with sentinel ticker @TICKER24H.{EXCHANGE}.
*/
async handleTicker24hRequest(request) {
const { jobId, requestId, ticker, clientId } = request;
const exchangeId = exchangeOf(ticker); // e.g. "BINANCE" from "@TICKER24H.BINANCE"
const exchangeName = exchangeId.toLowerCase();
this.logger.info({ jobId, requestId, ticker, exchangeId }, 'Processing TICKER_SNAPSHOT request');
// Immediately ack to reset Flink's dispatch-time timeout clock.
await this.zmqClient.sendHeartbeat(jobId);
try {
const tickers = await this.ccxtFetcher.fetchAllTickers(exchangeName);
this.logger.info({ jobId, requestId, exchangeId, count: tickers.length }, 'Fetched tickers from exchange');
await this.kafkaProducer.writeTickerBatch(this.config.kafka_ticker_topic, exchangeId, tickers, clientId, requestId);
this.logger.info({ jobId, requestId, exchangeId }, 'Ticker24h request complete — sending WorkComplete');
await this.zmqClient.sendComplete(jobId, true);
} catch (error) {
this.logger.error({ jobId, requestId, exchangeId, error: error.message }, 'Ticker24h request failed');
if (error instanceof ExchangeRateLimitError) {
this.pool.reportRateLimit(exchangeId, 'HISTORICAL', error.retryAfterMs);
}
await this.zmqClient.sendComplete(jobId, false, error.message);
}
this.pool.releaseSlot(jobId).catch(err =>
this.logger.error({ jobId, error: err.message }, 'Failed to release ticker24h slot'));
}
getStatus() {
return {
activeRealtime: this.activeRealtime.size,
pollerStats: this.realtimePoller.getStats(),
metadataStatus: this.metadataGenerator.getStatus()
};
}
async shutdown() {
if (this.isShutdown) return;
this.isShutdown = true;
this.logger.info('Shutting down ingestor worker');
if (this.metadataInterval) clearInterval(this.metadataInterval);
this.pool.shutdown();
this.realtimePoller.shutdown();
await this.ccxtFetcher.close();
await this.metadataGenerator.close();
await this.kafkaProducer.disconnect();
await this.zmqClient.shutdown();
this.logger.info('Ingestor worker shutdown complete');
process.exit(0);
}
}
// Main entry point
async function main() {
const config = loadConfig();
const worker = new IngestorWorker(config, logger);
process.on('SIGINT', () => worker.shutdown());
process.on('SIGTERM', () => worker.shutdown());
process.on('uncaughtException', error => {
logger.error({ error }, 'Uncaught exception');
worker.shutdown();
});
process.on('unhandledRejection', reason => {
logger.error({ reason }, 'Unhandled rejection');
});
await worker.start();
setInterval(() => {
logger.info({ status: worker.getStatus() }, 'Worker status');
}, 60000);
}
main().catch(error => {
logger.error({ error }, 'Fatal error');
process.exit(1);
});