// CCXT data fetcher for historical OHLC and realtime ticks import ccxt from 'ccxt'; /** * Thrown when an exchange returns a 429 rate-limit response. * retryAfterMs is derived from the exchange's Retry-After header when available. */ export class ExchangeRateLimitError extends Error { constructor(exchange, retryAfterMs, originalMessage) { super(`Rate limit on ${exchange}: retry after ${retryAfterMs}ms (${originalMessage})`); this.name = 'ExchangeRateLimitError'; this.exchange = exchange.toUpperCase(); this.retryAfterMs = retryAfterMs; } } /** * Extract retry-after duration in milliseconds from a CCXT RateLimitExceeded error. * Priority: Retry-After header → error message numeric → 30s fallback. */ function extractRetryAfterMs(exchange, error) { const header = exchange.last_response_headers?.['retry-after']; if (header) { const secs = parseFloat(header); if (!isNaN(secs)) return Math.ceil(secs * 1000); } // Some exchanges embed the delay in the message (e.g. "retry after 5000 ms") const msMatch = error.message?.match(/(\d+)\s*ms/i); if (msMatch) return parseInt(msMatch[1], 10); const secMatch = error.message?.match(/(\d+(?:\.\d+)?)\s*s(?:ec|econds?)?/i); if (secMatch) return Math.ceil(parseFloat(secMatch[1]) * 1000); return 30_000; } export class CCXTFetcher { constructor(config, logger, metadataGenerator = null) { this.config = config; this.logger = logger; this.exchanges = new Map(); this.metadataGenerator = metadataGenerator; this.metadataCache = new Map(); // Cache metadata by ticker } /** * Parse ticker string to exchange and symbol * Expected format: "SYMBOL.EXCHANGE" (e.g., "BTC/USDT.BINANCE") */ parseTicker(ticker) { const lastDot = ticker.lastIndexOf('.'); if (lastDot === -1) { throw new Error(`Invalid ticker format: ${ticker}. Expected "SYMBOL.EXCHANGE"`); } return { exchange: ticker.slice(lastDot + 1).toLowerCase(), symbol: ticker.slice(0, lastDot) }; } /** * Get metadata for a ticker (cached or generate on-the-fly) */ async getMetadata(ticker) { // Check cache first if (this.metadataCache.has(ticker)) { return this.metadataCache.get(ticker); } // Generate metadata on-the-fly if (!this.metadataGenerator) { throw new Error('Metadata generator not available'); } const { exchange: exchangeName, symbol } = this.parseTicker(ticker); const exchangeUpper = exchangeName.toUpperCase(); const exchange = this.getExchange(exchangeName); // Load market info from CCXT this.logger.info({ exchangeName, symbol }, 'Loading markets for metadata'); await exchange.loadMarkets(); const market = exchange.market(symbol); if (!market) { throw new Error(`Market not found: ${symbol} on ${exchangeUpper}`); } // Convert to our metadata format const metadata = this.metadataGenerator.convertMarketToMetadata(exchangeUpper, symbol, market); // Cache it this.metadataCache.set(ticker, metadata); return metadata; } /** * Get or create CCXT exchange instance */ getExchange(exchangeName) { if (this.exchanges.has(exchangeName)) { return this.exchanges.get(exchangeName); } // Create exchange instance const ExchangeClass = ccxt[exchangeName]; if (!ExchangeClass) { throw new Error(`Unsupported exchange: ${exchangeName}`); } const exchange = new ExchangeClass({ enableRateLimit: true, options: { defaultType: 'spot' } }); this.exchanges.set(exchangeName, exchange); this.logger.info({ exchange: exchangeName }, 'Created CCXT exchange instance'); return exchange; } /** * Fetch historical OHLC data * @param {string} ticker - Ticker in format "EXCHANGE:SYMBOL" * @param {string} startTime - Start time in microseconds * @param {string} endTime - End time in microseconds * @param {number} periodSeconds - OHLC period in seconds * @param {number} limit - Optional limit on number of candles * @returns {Promise} Array of OHLC candles */ async fetchHistoricalOHLC(ticker, startTime, endTime, periodSeconds, limit) { const { exchange: exchangeName, symbol } = this.parseTicker(ticker); const exchange = this.getExchange(exchangeName); // Convert nanoseconds to milliseconds const startMs = Math.floor(parseInt(startTime) / 1_000_000); const endMs = Math.floor(parseInt(endTime) / 1_000_000); // Map period seconds to CCXT timeframe const timeframe = this.secondsToTimeframe(periodSeconds); const marketsLoaded = exchange.markets != null && Object.keys(exchange.markets).length > 0; this.logger.info( { ticker, timeframe, startMs, endMs, limit, marketsLoaded }, 'Fetching historical OHLC' ); const fetchedCandles = []; let since = startMs; // Always page in fixed batches of 1000 regardless of any limit hint. // The caller's limit/countback is irrelevant to how much we need to fetch from the exchange. const PAGE_SIZE = 1000; const FETCH_RETRIES = 3; const FETCH_RETRY_DELAY_MS = 5000; // Binance provides extended kline data (buy/sell volume split, quote volume, trade count). // We use the raw klines endpoint directly to capture all available fields. const isBinance = exchangeName === 'binance'; let binanceMarketId = null; if (isBinance) { if (!exchange.markets || Object.keys(exchange.markets).length === 0) { await exchange.loadMarkets(); } binanceMarketId = exchange.market(symbol).id; } while (since < endMs) { let candles; let lastError; for (let attempt = 1; attempt <= FETCH_RETRIES; attempt++) { try { if (isBinance) { candles = await exchange.publicGetKlines({ symbol: binanceMarketId, interval: timeframe, startTime: since, limit: PAGE_SIZE }); } else { candles = await exchange.fetchOHLCV(symbol, timeframe, since, PAGE_SIZE); } lastError = null; break; } catch (error) { lastError = error; const isRateLimit = error.constructor?.name === 'RateLimitExceeded'; const isRetryable = !isRateLimit && ( error.constructor?.name === 'NetworkError' || error.constructor?.name === 'RequestTimeout' || error.constructor?.name === 'ExchangeNotAvailable' ); this.logger.warn( { errorType: error.constructor?.name, error: error.message, errorUrl: error.url, ticker, since, attempt, retryable: isRetryable, rateLimit: isRateLimit }, 'OHLC fetch attempt failed' ); if (isRateLimit || !isRetryable || attempt === FETCH_RETRIES) break; await exchange.sleep(FETCH_RETRY_DELAY_MS * attempt); } } if (lastError) { if (lastError.constructor?.name === 'RateLimitExceeded') { const retryAfterMs = extractRetryAfterMs(exchange, lastError); this.logger.warn({ ticker, retryAfterMs }, 'OHLC fetch rate-limited by exchange'); throw new ExchangeRateLimitError(exchangeName, retryAfterMs, lastError.message); } this.logger.error( { errorType: lastError.constructor?.name, error: lastError.message, errorUrl: lastError.url, ticker, since, marketsLoaded: exchange.markets != null && Object.keys(exchange.markets).length > 0, stack: lastError.stack }, 'Error fetching OHLC' ); throw lastError; } if (candles.length === 0) { break; } // Filter candles within the requested time range const filteredCandles = candles.filter(c => { const timestamp = c[0]; return timestamp >= startMs && timestamp < endMs; // endMs is exclusive }); fetchedCandles.push(...filteredCandles); // Advance to next batch start const lastTimestamp = candles[candles.length - 1][0]; since = lastTimestamp + (periodSeconds * 1000); if (since >= endMs) { break; } // Apply rate limiting await exchange.sleep(exchange.rateLimit); } // Get metadata for proper denomination const metadata = await this.getMetadata(ticker); // Build a map of fetched candles by timestamp (ms) const fetchedByTs = new Map(fetchedCandles.map(c => [c[0], c])); if (fetchedCandles.length === 0) { // No data from exchange — return empty so caller writes a NOT_FOUND marker. return []; } const periodMs = periodSeconds * 1000; // Forward-fill interior gaps — periods between the first and last real bar // where the exchange returned no candle. Edge gaps (before firstRealTs or // after lastRealTs) are left absent; they'll be caught by gap detection and // trigger a targeted backfill request. const realTimestamps = [...fetchedByTs.keys()].sort((a, b) => a - b); const firstRealTs = realTimestamps[0]; const lastRealTs = realTimestamps[realTimestamps.length - 1]; const allCandles = []; let gapCount = 0; let prevClose = null; for (let ts = firstRealTs; ts <= lastRealTs; ts += periodMs) { if (fetchedByTs.has(ts)) { const bar = this.convertToOHLC(fetchedByTs.get(ts), ticker, periodSeconds, metadata); prevClose = bar.close; allCandles.push(bar); } else if (prevClose !== null) { // Interior gap — forward-fill with previous close, zero volume gapCount++; const gapBar = { ticker, timestamp: (ts * 1_000_000).toString(), open: prevClose, high: prevClose, low: prevClose, close: prevClose, volume: '0', open_time: (ts * 1_000_000).toString(), close_time: ((ts + periodSeconds * 1000) * 1_000_000).toString() }; if (isBinance) { gapBar.buy_vol = '0'; gapBar.sell_vol = '0'; gapBar.num_trades = '0'; gapBar.quote_volume = '0'; } allCandles.push(gapBar); } } if (gapCount > 0) { this.logger.info( { ticker, gapCount, total: allCandles.length }, 'Forward-filled interior gap bars with previous close price' ); } return allCandles; } /** * Fetch recent trades for realtime tick data * @param {string} ticker - Ticker in format "EXCHANGE:SYMBOL" * @param {string} since - Optional timestamp in nanoseconds to fetch from * @returns {Promise} Array of trade ticks */ async fetchRecentTrades(ticker, since = null) { const { exchange: exchangeName, symbol } = this.parseTicker(ticker); const exchange = this.getExchange(exchangeName); try { // Convert nanoseconds to milliseconds if provided const sinceMs = since ? Math.floor(parseInt(since) / 1_000_000) : undefined; const trades = await exchange.fetchTrades(symbol, sinceMs, 1000); this.logger.debug( { ticker, count: trades.length }, 'Fetched recent trades' ); // Get metadata for proper denomination const metadata = await this.getMetadata(ticker); // Convert to our Tick format return trades.map(trade => this.convertToTick(trade, ticker, metadata)); } catch (error) { if (error.constructor?.name === 'RateLimitExceeded') { const retryAfterMs = extractRetryAfterMs(exchange, error); this.logger.warn({ ticker, retryAfterMs }, 'Trades fetch rate-limited by exchange'); throw new ExchangeRateLimitError(exchangeName, retryAfterMs, error.message); } this.logger.error( { error: error.message, ticker }, 'Error fetching trades' ); throw error; } } /** * Convert OHLCV array to our OHLC format. * * Accepts two formats: * - Standard CCXT (6 elements): [timestamp, open, high, low, close, volume] * - Binance raw klines (12 elements): [openTime, open, high, low, close, baseVolume, * closeTime, quoteVolume, numTrades, takerBuyBaseVol, takerBuyQuoteVol, ignore] * * Prices/volumes use integer representation scaled by market metadata precision. */ convertToOHLC(candle, ticker, periodSeconds, metadata) { const timestamp = Number(candle[0]); const open = parseFloat(candle[1]); const high = parseFloat(candle[2]); const low = parseFloat(candle[3]); const close = parseFloat(candle[4]); const volume = parseFloat(candle[5]); const priceMult = Math.pow(10, metadata.pricePrecision ?? 2); const sizeMult = Math.pow(10, metadata.sizePrecision ?? 8); const result = { ticker, timestamp: (timestamp * 1_000_000).toString(), open: Math.round(open * priceMult).toString(), high: Math.round(high * priceMult).toString(), low: Math.round(low * priceMult).toString(), close: Math.round(close * priceMult).toString(), volume: Math.round(volume * sizeMult).toString(), open_time: (timestamp * 1_000_000).toString(), }; if (candle.length >= 10) { // Binance extended klines format const closeTimeMs = Number(candle[6]); const quoteVolRaw = parseFloat(candle[7]); const numTrades = Number(candle[8]); const takerBuyBase = parseFloat(candle[9]); result.close_time = (closeTimeMs * 1_000_000).toString(); result.quote_volume = Math.round(quoteVolRaw * priceMult).toString(); result.num_trades = numTrades.toString(); result.buy_vol = Math.round(takerBuyBase * sizeMult).toString(); result.sell_vol = Math.round((volume - takerBuyBase) * sizeMult).toString(); } else { result.close_time = ((timestamp + periodSeconds * 1000) * 1_000_000).toString(); } return result; } /** * Convert CCXT trade to our Tick format * Uses precision fields from market metadata for proper integer representation */ convertToTick(trade, ticker, metadata) { const priceMult = Math.pow(10, metadata.pricePrecision ?? 2); const sizeMult = Math.pow(10, metadata.sizePrecision ?? 8); const price = Math.round(trade.price * priceMult); const amount = Math.round(trade.amount * sizeMult); const quoteAmount = Math.round((trade.price * trade.amount) * priceMult); // protobufjs v7 uses camelCase field names internally — must use camelCase here return { tradeId: trade.id || `${trade.timestamp}`, ticker, timestamp: (trade.timestamp * 1_000_000).toString(), // Convert ms to nanoseconds price: price.toString(), amount: amount.toString(), quoteAmount: quoteAmount.toString(), takerBuy: trade.side === 'buy', sequence: trade.order ? trade.order.toString() : undefined }; } /** * Fetch 1-minute bars covering the current open window for each configured period, * rolling them up into a single aggregate per period for Flink accumulator seeding. * * Returns one seed object per period (or null for periods that just started with no * completed 1m bars yet). Throws on exchange errors — caller handles retries. * * @param {string} ticker * @param {number[]} periodSeconds - configured periods (e.g. [60, 300, 900, 3600, 14400, 86400]) * @returns {Promise>} */ async fetchSeedCandles(ticker, periodSeconds) { const nowMs = Date.now(); const maxPeriod = Math.max(...periodSeconds); const longestWindowStart = Math.floor(nowMs / (maxPeriod * 1000)) * (maxPeriod * 1000); // fetchHistoricalOHLC expects nanoseconds as strings const startNs = (longestWindowStart * 1_000_000).toString(); const endNs = (nowMs * 1_000_000).toString(); const bars1m = await this.fetchHistoricalOHLC(ticker, startNs, endNs, 60, null); return periodSeconds.map(period => { const windowStart = Math.floor(nowMs / (period * 1000)) * (period * 1000); const relevant = bars1m.filter(b => { const tsMs = parseInt(b.timestamp) / 1_000_000; return tsMs >= windowStart && tsMs < nowMs; }); if (relevant.length === 0) return null; const open = parseInt(relevant[0].open); const high = Math.max(...relevant.map(b => parseInt(b.high))); const low = Math.min(...relevant.map(b => parseInt(b.low))); const close = parseInt(relevant[relevant.length - 1].close); const volume = relevant.reduce((sum, b) => sum + parseInt(b.volume), 0); return { periodSeconds: period, open, high, low, close, volume, windowStartMs: windowStart }; }); } /** * Convert a seed candle aggregate into a Tick-shaped object for Kafka. * price = open (scaled int), amount = volume (scaled int); seed_* fields carry H/L/C/period. */ convertSeedToTick(seed, ticker) { // protobufjs v7 uses camelCase field names internally — must use camelCase here return { tradeId: `seed-${ticker}-${seed.periodSeconds}-${seed.windowStartMs}`, ticker, timestamp: (seed.windowStartMs * 1_000_000).toString(), price: seed.open, amount: seed.volume, quoteAmount: 0, takerBuy: false, isSeed: true, seedHigh: seed.high, seedLow: seed.low, seedClose: seed.close, seedWindowStartMs: seed.windowStartMs, seedPeriodSeconds: seed.periodSeconds }; } /** * Convert period seconds to CCXT timeframe string */ secondsToTimeframe(seconds) { const timeframes = { 60: '1m', 300: '5m', 900: '15m', 1800: '30m', 3600: '1h', 7200: '2h', 14400: '4h', 21600: '6h', 28800: '8h', 43200: '12h', 86400: '1d', 259200: '3d', 604800: '1w', 2592000: '1M' }; const timeframe = timeframes[seconds]; if (!timeframe) { throw new Error(`Unsupported period: ${seconds} seconds`); } return timeframe; } /** * Fetch 24h rolling ticker stats for all symbols on an exchange. * Uses exchange.fetchTickers() — single API call, very rate-limit efficient. * Returns an array of TickerStats-compatible objects, or throws if unsupported. * * @param {string} exchangeName - lowercase exchange name (e.g. "binance") * @returns {Promise} Array of TickerStats objects */ async fetchAllTickers(exchangeName) { const exchange = this.getExchange(exchangeName); const exchangeUpper = exchangeName.toUpperCase(); if (!exchange.has['fetchTickers']) { throw new Error(`Exchange ${exchangeUpper} does not support fetchTickers()`); } this.logger.info({ exchange: exchangeUpper }, 'Fetching all 24h tickers'); let rawTickers; try { rawTickers = await exchange.fetchTickers(); } catch (error) { if (error.constructor?.name === 'RateLimitExceeded') { const retryAfterMs = extractRetryAfterMs(exchange, error); this.logger.warn({ exchange: exchangeUpper, retryAfterMs }, 'fetchTickers rate-limited'); throw new ExchangeRateLimitError(exchangeName, retryAfterMs, error.message); } this.logger.error({ error: error.message, exchange: exchangeUpper }, 'Error fetching tickers'); throw error; } const nowNs = (BigInt(Date.now()) * 1_000_000n).toString(); const tickers = []; for (const [symbol, t] of Object.entries(rawTickers)) { if (!t || t.last == null) continue; // Build Nautilus-format ticker: "BASE/QUOTE.EXCHANGE" const ticker = `${symbol}.${exchangeUpper}`; // Extract base/quote from the CCXT market info const market = exchange.markets?.[symbol]; const baseAsset = market?.base ?? symbol.split('/')[0] ?? ''; const quoteAsset = market?.quote ?? symbol.split('/')[1] ?? ''; // protobufjs camelCase: only removes '_' before LETTERS, not digits. // quote_volume_24h → quoteVolume_24h (underscore before '24' is preserved) // open_24h → open_24h, high_24h → high_24h, etc. const stat = { ticker, exchangeId: exchangeUpper, baseAsset, quoteAsset, lastPrice: t.last ?? 0, priceChangePct: t.percentage ?? 0, 'quoteVolume_24h': t.quoteVolume ?? 0, timestamp: nowNs, }; if (t.bid != null) stat.bidPrice = t.bid; if (t.ask != null) stat.askPrice = t.ask; if (t.open != null) stat['open_24h'] = t.open; if (t.high != null) stat['high_24h'] = t.high; if (t.low != null) stat['low_24h'] = t.low; if (t.baseVolume != null) stat['volume_24h'] = t.baseVolume; if (t.info?.count != null) stat.numTrades = Number(t.info.count); tickers.push(stat); } this.logger.info({ exchange: exchangeUpper, count: tickers.length }, 'Fetched all tickers'); return tickers; } /** * Close all exchange connections */ async close() { for (const [name, exchange] of this.exchanges) { if (exchange.close) { await exchange.close(); } } this.exchanges.clear(); } }