// Realtime tick data poller — polls exchange every 10s, writes ticks to market-tick Kafka topic. // Heartbeats every 5s so Flink IngestorBroker knows the job is alive. export class RealtimePoller { constructor(ccxtFetcher, kafkaProducer, zmqClient, logger) { this.ccxtFetcher = ccxtFetcher; this.kafkaProducer = kafkaProducer; this.zmqClient = zmqClient; this.logger = logger; // Active subscriptions: jobId -> subscription info this.subscriptions = new Map(); // Poll interval in milliseconds (10 seconds) this.pollInterval = 10000; // Heartbeat interval (5 seconds) this.heartbeatInterval = 5000; this.pollingLoop = null; this.heartbeatLoop = null; // Called with (jobId, error) when a subscription terminates abnormally. // Set by IngestorWorker to release the slot in SlotPool. this.onJobComplete = null; } /** * Start a realtime subscription for a job dispatched by IngestorBroker. * @param {string} jobId - Broker-assigned job ID (for heartbeats and COMPLETE) * @param {string} requestId - Original request ID (for metadata) * @param {string} ticker - Ticker to subscribe to * @param {string} kafkaTopic - Kafka topic to write ticks to (market-tick) */ startSubscription(jobId, requestId, ticker, kafkaTopic) { if (this.subscriptions.has(jobId)) { this.logger.warn({ jobId }, 'Subscription already exists'); return; } const subscription = { jobId, requestId, ticker, kafkaTopic, lastTimestamp: null, isActive: true, errorCount: 0 }; this.subscriptions.set(jobId, subscription); this.logger.info({ jobId, requestId, ticker, kafkaTopic }, 'Started realtime subscription'); if (!this.pollingLoop) { this.startPollingLoop(); } if (!this.heartbeatLoop) { this.startHeartbeatLoop(); } } /** * Stop a realtime subscription. Called when Flink sends WorkStop or on error. * Does NOT send WorkComplete — caller is responsible for that. */ cancelSubscription(jobId) { const subscription = this.subscriptions.get(jobId); if (subscription) { subscription.isActive = false; this.subscriptions.delete(jobId); this.logger.info({ jobId, ticker: subscription.ticker }, 'Cancelled realtime subscription'); } if (this.subscriptions.size === 0) { if (this.pollingLoop) { clearInterval(this.pollingLoop); this.pollingLoop = null; } if (this.heartbeatLoop) { clearInterval(this.heartbeatLoop); this.heartbeatLoop = null; } this.logger.info('Stopped polling/heartbeat loops — no active subscriptions'); } } startPollingLoop() { this.logger.info({ interval: this.pollInterval }, 'Starting polling loop'); this.pollingLoop = setInterval(() => this.pollAllSubscriptions(), this.pollInterval); // Immediate first poll this.pollAllSubscriptions(); } startHeartbeatLoop() { this.logger.info({ interval: this.heartbeatInterval }, 'Starting heartbeat loop'); this.heartbeatLoop = setInterval(async () => { for (const { jobId } of this.subscriptions.values()) { try { await this.zmqClient.sendHeartbeat(jobId); } catch (err) { this.logger.error({ jobId, error: err.message }, 'Failed to send heartbeat'); } } }, this.heartbeatInterval); } async pollAllSubscriptions() { const subscriptions = Array.from(this.subscriptions.values()); await Promise.allSettled(subscriptions.map(sub => this.pollSubscription(sub))); } async pollSubscription(subscription) { if (!subscription.isActive) return; const { jobId, requestId, ticker, kafkaTopic, lastTimestamp } = subscription; try { const trades = await this.ccxtFetcher.fetchRecentTrades(ticker, lastTimestamp); if (trades.length === 0) { this.logger.debug({ jobId, ticker }, 'No new trades'); return; } // Skip trades we've already seen (timestamp-based dedup) let newTrades = trades; if (lastTimestamp) { const lastTs = BigInt(lastTimestamp); newTrades = trades.filter(t => BigInt(t.timestamp) > lastTs); } if (newTrades.length > 0) { await this.kafkaProducer.writeTicks(kafkaTopic, newTrades); subscription.lastTimestamp = newTrades[newTrades.length - 1].timestamp; this.logger.info({ jobId, ticker, count: newTrades.length, kafkaTopic }, 'Wrote ticks to Kafka'); } subscription.errorCount = 0; } catch (error) { subscription.errorCount++; this.logger.error( { error: error.message, jobId, ticker, errorCount: subscription.errorCount }, 'Error polling subscription' ); // After 5 consecutive errors, give up and notify Flink if (subscription.errorCount >= 5) { this.logger.error({ jobId, ticker }, 'Cancelling subscription due to repeated errors'); this.cancelSubscription(jobId); try { await this.zmqClient.sendComplete(jobId, false, `Polling failed after 5 errors: ${error.message}`); } catch (zmqErr) { this.logger.error({ jobId, error: zmqErr.message }, 'Failed to send WorkComplete after error'); } if (this.onJobComplete) this.onJobComplete(jobId, error); } } } getStats() { return { totalSubscriptions: this.subscriptions.size, subscriptions: Array.from(this.subscriptions.values()).map(sub => ({ jobId: sub.jobId, requestId: sub.requestId, ticker: sub.ticker, isActive: sub.isActive, errorCount: sub.errorCount, lastTimestamp: sub.lastTimestamp })) }; } shutdown() { this.logger.info('Shutting down realtime poller'); if (this.pollingLoop) { clearInterval(this.pollingLoop); this.pollingLoop = null; } if (this.heartbeatLoop) { clearInterval(this.heartbeatLoop); this.heartbeatLoop = null; } for (const subscription of this.subscriptions.values()) { subscription.isActive = false; } this.subscriptions.clear(); } }