486 lines
17 KiB
JavaScript
486 lines
17 KiB
JavaScript
#!/usr/bin/env node
|
|
|
|
// Main ingestor worker process
|
|
import { readFileSync } from 'fs';
|
|
import { parse as parseYaml } from 'yaml';
|
|
import pino from 'pino';
|
|
import { ZmqClient } from './zmq-client.js';
|
|
import { KafkaProducer } from './kafka-producer.js';
|
|
import { CCXTFetcher, ExchangeRateLimitError } from './ccxt-fetcher.js';
|
|
import { RealtimePoller } from './realtime-poller.js';
|
|
import { SymbolMetadataGenerator } from './symbol-metadata-generator.js';
|
|
import { SlotType } from './proto/messages.js';
|
|
|
|
// Logger setup
|
|
const logger = pino({
|
|
level: process.env.LOG_LEVEL || 'info',
|
|
transport: {
|
|
target: 'pino-pretty',
|
|
options: {
|
|
colorize: true,
|
|
translateTime: 'SYS:standard',
|
|
ignore: 'pid,hostname'
|
|
}
|
|
}
|
|
});
|
|
|
|
// Load configuration
|
|
function loadConfig() {
|
|
const configPath = process.env.CONFIG_PATH || '/config/config.yaml';
|
|
const secretsPath = process.env.SECRETS_PATH || '/config/secrets.yaml';
|
|
|
|
let config = {};
|
|
let secrets = {};
|
|
|
|
try {
|
|
const configFile = readFileSync(configPath, 'utf8');
|
|
config = parseYaml(configFile);
|
|
logger.info({ path: configPath }, 'Loaded configuration');
|
|
} catch (error) {
|
|
logger.warn({ error: error.message }, 'Could not load config, using defaults');
|
|
}
|
|
|
|
try {
|
|
const secretsFile = readFileSync(secretsPath, 'utf8');
|
|
secrets = parseYaml(secretsFile);
|
|
logger.info({ path: secretsPath }, 'Loaded secrets');
|
|
} catch (error) {
|
|
logger.warn({ error: error.message }, 'Could not load secrets');
|
|
}
|
|
|
|
return {
|
|
// Flink ZMQ endpoints
|
|
flink_hostname: config.flink_hostname || 'localhost',
|
|
ingestor_broker_port: config.ingestor_broker_port || 5567,
|
|
|
|
// Kafka configuration
|
|
kafka_brokers: config.kafka_brokers || ['localhost:9092'],
|
|
kafka_ohlc_topic: config.kafka_ohlc_topic || 'market-ohlc',
|
|
kafka_tick_topic: config.kafka_tick_topic || 'market-tick',
|
|
|
|
// Worker configuration
|
|
poll_interval_ms: config.poll_interval_ms || 10000,
|
|
|
|
// Symbol metadata configuration
|
|
supported_exchanges: config.supported_exchanges || ['binance', 'coinbase', 'kraken'],
|
|
symbol_metadata_interval_ms: config.symbol_metadata_interval_ms || 6 * 60 * 60 * 1000,
|
|
|
|
// Per-exchange slot capacity
|
|
exchange_capacity: config.exchange_capacity || {
|
|
BINANCE: { historical_slots: 3, realtime_slots: 5 },
|
|
KRAKEN: { historical_slots: 2, realtime_slots: 3 },
|
|
COINBASE: { historical_slots: 2, realtime_slots: 4 }
|
|
},
|
|
|
|
...secrets
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Manages work slots per exchange per job type.
|
|
*
|
|
* Each slot corresponds to one WorkerReady message sent to Flink. Flink consumes
|
|
* a slot when it dispatches a job. The slot is re-offered (via another WorkerReady)
|
|
* once the job completes, subject to any rate-limit backoff dictated by the exchange.
|
|
*/
|
|
class SlotPool {
|
|
constructor(exchangeCapacity, zmqClient, logger) {
|
|
this.zmqClient = zmqClient;
|
|
this.logger = logger;
|
|
|
|
// Key: 'EXCHANGE|TYPE' (e.g. 'BINANCE|HISTORICAL')
|
|
// Value: { max, active: Set<jobId>, backoffUntil: ms timestamp }
|
|
this.slots = new Map();
|
|
|
|
for (const [exchange, cap] of Object.entries(exchangeCapacity)) {
|
|
const ex = exchange.toUpperCase();
|
|
this.slots.set(`${ex}|HISTORICAL`, {
|
|
max: cap.historical_slots ?? 2,
|
|
active: new Set(),
|
|
backoffUntil: 0
|
|
});
|
|
this.slots.set(`${ex}|REALTIME`, {
|
|
max: cap.realtime_slots ?? 3,
|
|
active: new Set(),
|
|
backoffUntil: 0
|
|
});
|
|
}
|
|
|
|
// jobId → { exchange, type } for release tracking
|
|
this.jobMap = new Map();
|
|
}
|
|
|
|
/**
|
|
* Register the onConnected callback so slot offers are sent on every
|
|
* TCP (re)connect rather than once at startup. Handles both the initial
|
|
* connection race (Flink ROUTER not yet ready) and Flink restarts.
|
|
*/
|
|
init() {
|
|
this.zmqClient.onConnected = () => this._offerAllFreeSlots();
|
|
this.logger.info(
|
|
{ slots: [...this.slots.entries()].map(([k, v]) => `${k}:${v.max}`) },
|
|
'Slot pool initialized — will offer slots on connect'
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Re-offer all currently-free slots. Called on every TCP (re)connect.
|
|
* Sends (max - active) WorkerReady messages per exchange+type key.
|
|
*/
|
|
async _offerAllFreeSlots() {
|
|
const summary = [];
|
|
for (const [key, slot] of this.slots) {
|
|
const [exchange, type] = key.split('|');
|
|
const freeCount = slot.max - slot.active.size;
|
|
for (let i = 0; i < freeCount; i++) {
|
|
await this.zmqClient.sendTypedReady(exchange, SlotType[type]);
|
|
}
|
|
summary.push(`${key}:${freeCount}/${slot.max}`);
|
|
}
|
|
this.logger.info({ offered: summary }, 'Re-offered all free slots on connect');
|
|
}
|
|
|
|
/**
|
|
* Record a slot as occupied by jobId.
|
|
* @param {string} jobId
|
|
* @param {string} exchange - e.g. 'BINANCE'
|
|
* @param {string} type - 'HISTORICAL' | 'REALTIME'
|
|
*/
|
|
consumeSlot(jobId, exchange, type) {
|
|
const key = `${exchange.toUpperCase()}|${type}`;
|
|
const slot = this.slots.get(key);
|
|
if (slot) {
|
|
if (slot.active.size >= slot.max) {
|
|
this.logger.warn({ jobId, key, active: slot.active.size, max: slot.max }, 'Slot capacity exceeded — rejecting job');
|
|
return false;
|
|
}
|
|
slot.active.add(jobId);
|
|
this.jobMap.set(jobId, { exchange: exchange.toUpperCase(), type });
|
|
this.logger.debug({ jobId, key, active: slot.active.size, max: slot.max }, 'Slot consumed');
|
|
return true;
|
|
}
|
|
this.logger.warn({ jobId, key }, 'No slot config for this exchange+type');
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Release the slot occupied by jobId and re-offer it to Flink (after any backoff).
|
|
*/
|
|
async releaseSlot(jobId) {
|
|
const info = this.jobMap.get(jobId);
|
|
if (!info) {
|
|
this.logger.warn({ jobId }, 'releaseSlot called for unknown jobId');
|
|
return;
|
|
}
|
|
this.jobMap.delete(jobId);
|
|
const key = `${info.exchange}|${info.type}`;
|
|
const slot = this.slots.get(key);
|
|
if (slot) {
|
|
slot.active.delete(jobId);
|
|
await this._offerSlot(info.exchange, info.type, slot);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Record a rate limit from the exchange. Delays slot re-offer by retryAfterMs.
|
|
* @param {string} exchange
|
|
* @param {string} type - 'HISTORICAL' | 'REALTIME'
|
|
* @param {number} retryAfterMs
|
|
*/
|
|
reportRateLimit(exchange, type, retryAfterMs) {
|
|
const key = `${exchange.toUpperCase()}|${type}`;
|
|
const slot = this.slots.get(key);
|
|
if (slot) {
|
|
slot.backoffUntil = Math.max(slot.backoffUntil, Date.now() + retryAfterMs);
|
|
this.logger.warn({ exchange, type, retryAfterMs }, 'Rate limit backoff set for slot');
|
|
}
|
|
}
|
|
|
|
async _offerSlot(exchange, type, slot) {
|
|
const now = Date.now();
|
|
if (now < slot.backoffUntil) {
|
|
const delay = slot.backoffUntil - now;
|
|
this.logger.info({ exchange, type, delayMs: delay }, 'Slot in backoff — scheduling re-offer');
|
|
setTimeout(() => this._offerSlot(exchange, type, slot), delay);
|
|
return;
|
|
}
|
|
try {
|
|
await this.zmqClient.sendTypedReady(exchange, SlotType[type]);
|
|
this.logger.debug({ exchange, type }, 'Slot re-offered to Flink');
|
|
} catch (err) {
|
|
this.logger.error({ exchange, type, error: err.message }, 'Failed to re-offer slot');
|
|
}
|
|
}
|
|
|
|
shutdown() {}
|
|
}
|
|
|
|
/** Extract exchange name from ticker string, e.g. "BTC/USDT.BINANCE" → "BINANCE" */
|
|
function exchangeOf(ticker) {
|
|
const lastDot = ticker?.lastIndexOf('.');
|
|
return (lastDot >= 0) ? ticker.slice(lastDot + 1).toUpperCase() : 'UNKNOWN';
|
|
}
|
|
|
|
class IngestorWorker {
|
|
constructor(config, logger) {
|
|
this.config = config;
|
|
this.logger = logger;
|
|
|
|
this.zmqClient = new ZmqClient(config, logger.child({ component: 'zmq' }));
|
|
this.kafkaProducer = new KafkaProducer(config, logger.child({ component: 'kafka' }));
|
|
this.metadataGenerator = new SymbolMetadataGenerator(
|
|
config,
|
|
this.kafkaProducer,
|
|
logger.child({ component: 'metadata' })
|
|
);
|
|
this.ccxtFetcher = new CCXTFetcher(
|
|
config,
|
|
logger.child({ component: 'ccxt' }),
|
|
this.metadataGenerator
|
|
);
|
|
this.realtimePoller = new RealtimePoller(
|
|
this.ccxtFetcher,
|
|
this.kafkaProducer,
|
|
this.zmqClient,
|
|
logger.child({ component: 'poller' })
|
|
);
|
|
|
|
this.pool = new SlotPool(
|
|
config.exchange_capacity,
|
|
this.zmqClient,
|
|
logger.child({ component: 'pool' })
|
|
);
|
|
|
|
// When realtime poller terminates a subscription due to repeated errors, release its slot.
|
|
this.realtimePoller.onJobComplete = (jobId, error) => {
|
|
if (error instanceof ExchangeRateLimitError) {
|
|
this.pool.reportRateLimit(error.exchange, 'REALTIME', error.retryAfterMs);
|
|
}
|
|
this.pool.releaseSlot(jobId).catch(err =>
|
|
this.logger.error({ jobId, error: err.message }, 'Failed to release slot after realtime error'));
|
|
};
|
|
|
|
// jobId set for active realtime subscriptions
|
|
this.activeRealtime = new Set();
|
|
|
|
this.isShutdown = false;
|
|
this.metadataInterval = null;
|
|
}
|
|
|
|
async start() {
|
|
this.logger.info('Starting CCXT ingestor worker');
|
|
|
|
await this.kafkaProducer.connect();
|
|
|
|
// Wire event callbacks before connecting so we don't miss early messages
|
|
this.zmqClient.onWorkAssign = req => this.handleWorkAssign(req);
|
|
this.zmqClient.onWorkStop = jobId => this.handleWorkStop(jobId);
|
|
|
|
// Register slot offer callback before connecting so we don't miss the event
|
|
this.pool.init();
|
|
|
|
await this.zmqClient.connect();
|
|
|
|
// Generate symbol metadata on startup
|
|
this.logger.info('Generating initial symbol metadata');
|
|
try {
|
|
const results = await this.metadataGenerator.generateAll();
|
|
this.logger.info({ results }, 'Initial symbol metadata generated');
|
|
} catch (error) {
|
|
this.logger.error({ error: error.message }, 'Failed to generate initial symbol metadata');
|
|
}
|
|
|
|
// Schedule periodic metadata generation
|
|
this.metadataInterval = setInterval(async () => {
|
|
this.logger.info('Periodic symbol metadata generation');
|
|
try {
|
|
const results = await this.metadataGenerator.generateAll();
|
|
this.logger.info({ results }, 'Periodic symbol metadata generated');
|
|
} catch (error) {
|
|
this.logger.error({ error: error.message }, 'Failed to generate periodic symbol metadata');
|
|
}
|
|
}, this.config.symbol_metadata_interval_ms);
|
|
|
|
this.logger.info('Ingestor worker started successfully');
|
|
}
|
|
|
|
/**
|
|
* Handle a WorkAssign message dispatched by Flink IngestorBroker.
|
|
* Called from the ZmqClient receive loop — do not block.
|
|
*/
|
|
handleWorkAssign(request) {
|
|
const { jobId, requestId, type, ticker } = request;
|
|
const exchange = exchangeOf(ticker);
|
|
|
|
this.logger.info({ jobId, requestId, type, ticker, exchange }, 'Received WorkAssign');
|
|
|
|
const isHistorical = !type || type === 'HISTORICAL_OHLC' || type === 0;
|
|
const isRealtime = type === 'REALTIME_TICKS' || type === 1;
|
|
|
|
if (isHistorical) {
|
|
if (!this.pool.consumeSlot(jobId, exchange, 'HISTORICAL')) {
|
|
this.zmqClient.sendReject(jobId, 'Slot capacity exceeded').catch(() => {});
|
|
return;
|
|
}
|
|
this.handleHistoricalRequest(request).catch(err => {
|
|
this.logger.error({ jobId, requestId, error: err.message }, 'Unexpected error in historical handler');
|
|
});
|
|
} else if (isRealtime) {
|
|
if (!this.pool.consumeSlot(jobId, exchange, 'REALTIME')) {
|
|
this.zmqClient.sendReject(jobId, 'Slot capacity exceeded').catch(() => {});
|
|
return;
|
|
}
|
|
this.handleRealtimeRequest(request);
|
|
} else {
|
|
this.logger.warn({ jobId, type }, 'Unknown request type — rejecting');
|
|
this.zmqClient.sendReject(jobId, `Unknown request type: ${type}`).catch(() => {});
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle WorkStop sent by Flink (e.g., all subscribers left).
|
|
*/
|
|
handleWorkStop(jobId) {
|
|
this.logger.info({ jobId }, 'Received WorkStop — cancelling realtime subscription');
|
|
this.realtimePoller.cancelSubscription(jobId);
|
|
this.activeRealtime.delete(jobId);
|
|
this.pool.releaseSlot(jobId).catch(err =>
|
|
this.logger.warn({ jobId, error: err.message }, 'Failed to release slot after WorkStop'));
|
|
// No WorkComplete needed — Flink sent the stop, it already knows.
|
|
}
|
|
|
|
/**
|
|
* Fetch historical OHLC data and write to Kafka.
|
|
* Sends WorkComplete when done (success or error).
|
|
*/
|
|
async handleHistoricalRequest(request) {
|
|
const { jobId, requestId, ticker, historical, clientId: client_id } = request;
|
|
const exchange = exchangeOf(ticker);
|
|
const { startTime: start_time, endTime: end_time, periodSeconds: period_seconds, limit } = historical || {};
|
|
|
|
this.logger.info({ jobId, requestId, ticker, period_seconds }, 'Processing historical OHLC request');
|
|
|
|
// Immediately ack to reset Flink's dispatch-time timeout clock.
|
|
await this.zmqClient.sendHeartbeat(jobId);
|
|
|
|
try {
|
|
const candles = await this.ccxtFetcher.fetchHistoricalOHLC(
|
|
ticker, start_time, end_time, period_seconds, limit
|
|
);
|
|
|
|
this.logger.info({ jobId, requestId, ticker, count: candles.length }, 'Fetched from exchange');
|
|
|
|
if (candles.length > 0) {
|
|
const metadata = { request_id: requestId, client_id, ticker, period_seconds, start_time, end_time };
|
|
// 8000 rows/page: each OHLC row is ~77 bytes typical (9 populated fields as
|
|
// protobuf varints + ticker string). Worst-case is ~124 bytes, so 8000 rows
|
|
// stays safely under Kafka's 1MB message limit in all realistic scenarios.
|
|
const PAGE_SIZE = 8000;
|
|
for (let i = 0; i < candles.length; i += PAGE_SIZE) {
|
|
const page = candles.slice(i, i + PAGE_SIZE);
|
|
const isLastPage = (i + PAGE_SIZE) >= candles.length;
|
|
await this.kafkaProducer.writeOHLCs(this.config.kafka_ohlc_topic, page, metadata, isLastPage);
|
|
}
|
|
this.logger.info(
|
|
{ jobId, requestId, ticker, count: candles.length, pages: Math.ceil(candles.length / PAGE_SIZE) },
|
|
'Wrote all pages to Kafka'
|
|
);
|
|
} else {
|
|
await this.kafkaProducer.writeMarker(this.config.kafka_ohlc_topic, {
|
|
request_id: requestId, client_id, ticker, period_seconds, start_time, end_time,
|
|
status: 'NOT_FOUND', message: 'No data available for requested period'
|
|
});
|
|
}
|
|
|
|
this.logger.info({ jobId, requestId, ticker }, 'Historical request complete — sending WorkComplete');
|
|
await this.zmqClient.sendComplete(jobId, true);
|
|
|
|
} catch (error) {
|
|
this.logger.error({ jobId, requestId, ticker, error: error.message }, 'Historical request failed');
|
|
|
|
if (error instanceof ExchangeRateLimitError) {
|
|
this.pool.reportRateLimit(exchange, 'HISTORICAL', error.retryAfterMs);
|
|
}
|
|
|
|
try {
|
|
await this.kafkaProducer.writeMarker(this.config.kafka_ohlc_topic, {
|
|
request_id: requestId, client_id, ticker, period_seconds, start_time, end_time,
|
|
status: 'ERROR', error_message: error.message
|
|
});
|
|
} catch (kafkaErr) {
|
|
this.logger.error({ jobId, error: kafkaErr.message }, 'Failed to write error marker to Kafka');
|
|
}
|
|
|
|
await this.zmqClient.sendComplete(jobId, false, error.message);
|
|
}
|
|
|
|
// Release slot regardless of success or failure
|
|
this.pool.releaseSlot(jobId).catch(err =>
|
|
this.logger.error({ jobId, error: err.message }, 'Failed to release historical slot'));
|
|
}
|
|
|
|
/**
|
|
* Start realtime tick polling for a job dispatched by Flink.
|
|
*/
|
|
handleRealtimeRequest(request) {
|
|
const { jobId, requestId, ticker } = request;
|
|
this.logger.info({ jobId, requestId, ticker }, 'Processing realtime subscription request');
|
|
|
|
this.activeRealtime.add(jobId);
|
|
this.realtimePoller.startSubscription(jobId, requestId, ticker, this.config.kafka_tick_topic);
|
|
}
|
|
|
|
getStatus() {
|
|
return {
|
|
activeRealtime: this.activeRealtime.size,
|
|
pollerStats: this.realtimePoller.getStats(),
|
|
metadataStatus: this.metadataGenerator.getStatus()
|
|
};
|
|
}
|
|
|
|
async shutdown() {
|
|
if (this.isShutdown) return;
|
|
this.isShutdown = true;
|
|
this.logger.info('Shutting down ingestor worker');
|
|
|
|
if (this.metadataInterval) clearInterval(this.metadataInterval);
|
|
|
|
this.pool.shutdown();
|
|
this.realtimePoller.shutdown();
|
|
await this.ccxtFetcher.close();
|
|
await this.metadataGenerator.close();
|
|
await this.kafkaProducer.disconnect();
|
|
await this.zmqClient.shutdown();
|
|
|
|
this.logger.info('Ingestor worker shutdown complete');
|
|
process.exit(0);
|
|
}
|
|
}
|
|
|
|
// Main entry point
|
|
async function main() {
|
|
const config = loadConfig();
|
|
const worker = new IngestorWorker(config, logger);
|
|
|
|
process.on('SIGINT', () => worker.shutdown());
|
|
process.on('SIGTERM', () => worker.shutdown());
|
|
process.on('uncaughtException', error => {
|
|
logger.error({ error }, 'Uncaught exception');
|
|
worker.shutdown();
|
|
});
|
|
process.on('unhandledRejection', reason => {
|
|
logger.error({ reason }, 'Unhandled rejection');
|
|
});
|
|
|
|
await worker.start();
|
|
|
|
setInterval(() => {
|
|
logger.info({ status: worker.getStatus() }, 'Worker status');
|
|
}, 60000);
|
|
}
|
|
|
|
main().catch(error => {
|
|
logger.error({ error }, 'Fatal error');
|
|
process.exit(1);
|
|
});
|