diff --git a/ingestor/src/ccxt-fetcher.js b/ingestor/src/ccxt-fetcher.js index e79e4f12..ea9d943b 100644 --- a/ingestor/src/ccxt-fetcher.js +++ b/ingestor/src/ccxt-fetcher.js @@ -32,6 +32,32 @@ function extractRetryAfterMs(exchange, error) { return 30_000; } +// Per-exchange descriptor of extended OHLCV fields beyond the standard 6 +// (timestamp, open, high, low, close, volume). +// +// 'index' — extract candle[index]; skipped if candle is too short. +// 'complementOf' — compute as Math.round((totalVolume - candle[index]) * sizeMult). +// Scale types: 'ms_to_ns' | 'price' | 'size' | 'int' +const EXCHANGE_OHLCV_EXTENSIONS = { + binance: { + close_time: { index: 6, scale: 'ms_to_ns' }, + quote_volume: { index: 7, scale: 'price' }, + num_trades: { index: 8, scale: 'int' }, + buy_vol: { index: 9, scale: 'size' }, + sell_vol: { complementOf: 9, scale: 'size' }, + }, + // Add future exchanges here +}; + +function applyScale(raw, scale, priceMult, sizeMult) { + switch (scale) { + case 'ms_to_ns': return String(Number(raw) * 1_000_000); + case 'price': return String(Math.round(parseFloat(raw) * priceMult)); + case 'size': return String(Math.round(parseFloat(raw) * sizeMult)); + case 'int': return String(Number(raw)); + } +} + export class CCXTFetcher { constructor(config, logger, metadataGenerator = null) { this.config = config; @@ -281,7 +307,7 @@ export class CCXTFetcher { for (let ts = firstRealTs; ts <= lastRealTs; ts += periodMs) { if (fetchedByTs.has(ts)) { - const bar = this.convertToOHLC(fetchedByTs.get(ts), ticker, periodSeconds, metadata); + const bar = this.convertToOHLC(fetchedByTs.get(ts), ticker, periodSeconds, metadata, exchangeName); prevClose = bar.close; allCandles.push(bar); } else if (prevClose !== null) { @@ -298,11 +324,9 @@ export class CCXTFetcher { open_time: (ts * 1_000_000).toString(), close_time: ((ts + periodSeconds * 1000) * 1_000_000).toString() }; - if (isBinance) { - gapBar.buy_vol = '0'; - gapBar.sell_vol = '0'; - gapBar.num_trades = '0'; - gapBar.quote_volume = '0'; + const gapExtensions = EXCHANGE_OHLCV_EXTENSIONS[exchangeName] || {}; + for (const [fieldName] of Object.entries(gapExtensions)) { + if (fieldName !== 'close_time') gapBar[fieldName] = '0'; } allCandles.push(gapBar); } @@ -368,7 +392,7 @@ export class CCXTFetcher { * * Prices/volumes use integer representation scaled by market metadata precision. */ - convertToOHLC(candle, ticker, periodSeconds, metadata) { + convertToOHLC(candle, ticker, periodSeconds, metadata, exchangeName = null) { const timestamp = Number(candle[0]); const open = parseFloat(candle[1]); const high = parseFloat(candle[2]); @@ -388,22 +412,21 @@ export class CCXTFetcher { close: Math.round(close * priceMult).toString(), volume: Math.round(volume * sizeMult).toString(), open_time: (timestamp * 1_000_000).toString(), + close_time: ((timestamp + periodSeconds * 1000) * 1_000_000).toString(), }; - if (candle.length >= 10) { - // Binance extended klines format - const closeTimeMs = Number(candle[6]); - const quoteVolRaw = parseFloat(candle[7]); - const numTrades = Number(candle[8]); - const takerBuyBase = parseFloat(candle[9]); - - result.close_time = (closeTimeMs * 1_000_000).toString(); - result.quote_volume = Math.round(quoteVolRaw * priceMult).toString(); - result.num_trades = numTrades.toString(); - result.buy_vol = Math.round(takerBuyBase * sizeMult).toString(); - result.sell_vol = Math.round((volume - takerBuyBase) * sizeMult).toString(); - } else { - result.close_time = ((timestamp + periodSeconds * 1000) * 1_000_000).toString(); + const extensions = EXCHANGE_OHLCV_EXTENSIONS[exchangeName] || {}; + for (const [fieldName, spec] of Object.entries(extensions)) { + if ('complementOf' in spec) { + if (candle.length > spec.complementOf) { + const base = parseFloat(candle[spec.complementOf]); + result[fieldName] = String(Math.round((volume - base) * sizeMult)); + } + } else if ('index' in spec) { + if (candle.length > spec.index) { + result[fieldName] = applyScale(candle[spec.index], spec.scale, priceMult, sizeMult); + } + } } return result; diff --git a/sandbox/dexorder/impl/data_api_impl.py b/sandbox/dexorder/impl/data_api_impl.py index bb88b14f..784e0575 100644 --- a/sandbox/dexorder/impl/data_api_impl.py +++ b/sandbox/dexorder/impl/data_api_impl.py @@ -166,13 +166,11 @@ class DataAPIImpl(DataAPI): period_seconds=period_seconds, start_time=start_nanos, end_time=end_nanos, - request_timeout=self.request_timeout + columns=columns_to_fetch, + request_timeout=self.request_timeout, ) - # Select only requested columns (filter out metadata and unrequested fields) if not df.empty: - available_cols = [col for col in columns_to_fetch if col in df.columns] - df = df[available_cols] self._bars_fetched += len(df) return df diff --git a/sandbox/dexorder/ohlc_client.py b/sandbox/dexorder/ohlc_client.py index 95516fd9..381cca9d 100644 --- a/sandbox/dexorder/ohlc_client.py +++ b/sandbox/dexorder/ohlc_client.py @@ -5,7 +5,7 @@ OHLCClient - High-level API for fetching OHLC data with smart caching import asyncio import pandas as pd import logging -from typing import Optional +from typing import List, Optional from .iceberg_client import IcebergClient from .history_client import HistoryClient from .symbol_metadata_client import SymbolMetadataClient @@ -90,6 +90,7 @@ class OHLCClient: period_seconds: int, start_time: int, end_time: int, + columns: Optional[List[str]] = None, request_timeout: float = 120.0 ) -> pd.DataFrame: """ @@ -123,7 +124,7 @@ class OHLCClient: end_time = ((end_time + period_nanos - 1) // period_nanos) * period_nanos # exclusive # Step 1: Check Iceberg for existing data (run in thread — scan.to_pandas() blocks ~3-5s) - df = await asyncio.to_thread(self.iceberg.query_ohlc, ticker, period_seconds, start_time, end_time) + df = await asyncio.to_thread(self.iceberg.query_ohlc, ticker, period_seconds, start_time, end_time, columns) # Step 2: Identify missing ranges — pass df to avoid a redundant Iceberg scan missing_ranges = self.iceberg.find_missing_ranges( @@ -149,7 +150,7 @@ class OHLCClient: raise ValueError(f"Historical data request failed: {result['error_message']}") # Step 5: Query Iceberg again for complete dataset (run in thread) - df = await asyncio.to_thread(self.iceberg.query_ohlc, ticker, period_seconds, start_time, end_time) + df = await asyncio.to_thread(self.iceberg.query_ohlc, ticker, period_seconds, start_time, end_time, columns) return self._apply_decimal_correction(ticker, df)