Support custom column selection in OHLC queries and extend CCXT with configurable exchange-specific fields

- Add `columns` parameter to `get_ohlc_async` and pass through to Iceberg queries
- Replace hardcoded Binance field extraction with declarative `EXCHANGE_OHLCV_EXTENSIONS` config
- Add `applyScale` helper for field-specific transformations (ms_to_ns, price, size, int)
- Support `complementOf` spec for derived fields (e.g., sell_vol from total - buy_vol)
- Apply extensions dynamically in `convertToOHLC` and gap-filling logic
- Remove redundant column filtering in DataAPI (now handled upstream)
This commit is contained in:
2026-04-28 20:00:10 -04:00
parent 77e9ad7f68
commit b4e99744d8
3 changed files with 50 additions and 28 deletions

View File

@@ -32,6 +32,32 @@ function extractRetryAfterMs(exchange, error) {
return 30_000;
}
// Per-exchange descriptor of extended OHLCV fields beyond the standard 6
// (timestamp, open, high, low, close, volume).
//
// 'index' — extract candle[index]; skipped if candle is too short.
// 'complementOf' — compute as Math.round((totalVolume - candle[index]) * sizeMult).
// Scale types: 'ms_to_ns' | 'price' | 'size' | 'int'
const EXCHANGE_OHLCV_EXTENSIONS = {
binance: {
close_time: { index: 6, scale: 'ms_to_ns' },
quote_volume: { index: 7, scale: 'price' },
num_trades: { index: 8, scale: 'int' },
buy_vol: { index: 9, scale: 'size' },
sell_vol: { complementOf: 9, scale: 'size' },
},
// Add future exchanges here
};
function applyScale(raw, scale, priceMult, sizeMult) {
switch (scale) {
case 'ms_to_ns': return String(Number(raw) * 1_000_000);
case 'price': return String(Math.round(parseFloat(raw) * priceMult));
case 'size': return String(Math.round(parseFloat(raw) * sizeMult));
case 'int': return String(Number(raw));
}
}
export class CCXTFetcher {
constructor(config, logger, metadataGenerator = null) {
this.config = config;
@@ -281,7 +307,7 @@ export class CCXTFetcher {
for (let ts = firstRealTs; ts <= lastRealTs; ts += periodMs) {
if (fetchedByTs.has(ts)) {
const bar = this.convertToOHLC(fetchedByTs.get(ts), ticker, periodSeconds, metadata);
const bar = this.convertToOHLC(fetchedByTs.get(ts), ticker, periodSeconds, metadata, exchangeName);
prevClose = bar.close;
allCandles.push(bar);
} else if (prevClose !== null) {
@@ -298,11 +324,9 @@ export class CCXTFetcher {
open_time: (ts * 1_000_000).toString(),
close_time: ((ts + periodSeconds * 1000) * 1_000_000).toString()
};
if (isBinance) {
gapBar.buy_vol = '0';
gapBar.sell_vol = '0';
gapBar.num_trades = '0';
gapBar.quote_volume = '0';
const gapExtensions = EXCHANGE_OHLCV_EXTENSIONS[exchangeName] || {};
for (const [fieldName] of Object.entries(gapExtensions)) {
if (fieldName !== 'close_time') gapBar[fieldName] = '0';
}
allCandles.push(gapBar);
}
@@ -368,7 +392,7 @@ export class CCXTFetcher {
*
* Prices/volumes use integer representation scaled by market metadata precision.
*/
convertToOHLC(candle, ticker, periodSeconds, metadata) {
convertToOHLC(candle, ticker, periodSeconds, metadata, exchangeName = null) {
const timestamp = Number(candle[0]);
const open = parseFloat(candle[1]);
const high = parseFloat(candle[2]);
@@ -388,22 +412,21 @@ export class CCXTFetcher {
close: Math.round(close * priceMult).toString(),
volume: Math.round(volume * sizeMult).toString(),
open_time: (timestamp * 1_000_000).toString(),
close_time: ((timestamp + periodSeconds * 1000) * 1_000_000).toString(),
};
if (candle.length >= 10) {
// Binance extended klines format
const closeTimeMs = Number(candle[6]);
const quoteVolRaw = parseFloat(candle[7]);
const numTrades = Number(candle[8]);
const takerBuyBase = parseFloat(candle[9]);
result.close_time = (closeTimeMs * 1_000_000).toString();
result.quote_volume = Math.round(quoteVolRaw * priceMult).toString();
result.num_trades = numTrades.toString();
result.buy_vol = Math.round(takerBuyBase * sizeMult).toString();
result.sell_vol = Math.round((volume - takerBuyBase) * sizeMult).toString();
} else {
result.close_time = ((timestamp + periodSeconds * 1000) * 1_000_000).toString();
const extensions = EXCHANGE_OHLCV_EXTENSIONS[exchangeName] || {};
for (const [fieldName, spec] of Object.entries(extensions)) {
if ('complementOf' in spec) {
if (candle.length > spec.complementOf) {
const base = parseFloat(candle[spec.complementOf]);
result[fieldName] = String(Math.round((volume - base) * sizeMult));
}
} else if ('index' in spec) {
if (candle.length > spec.index) {
result[fieldName] = applyScale(candle[spec.index], spec.scale, priceMult, sizeMult);
}
}
}
return result;

View File

@@ -166,13 +166,11 @@ class DataAPIImpl(DataAPI):
period_seconds=period_seconds,
start_time=start_nanos,
end_time=end_nanos,
request_timeout=self.request_timeout
columns=columns_to_fetch,
request_timeout=self.request_timeout,
)
# Select only requested columns (filter out metadata and unrequested fields)
if not df.empty:
available_cols = [col for col in columns_to_fetch if col in df.columns]
df = df[available_cols]
self._bars_fetched += len(df)
return df

View File

@@ -5,7 +5,7 @@ OHLCClient - High-level API for fetching OHLC data with smart caching
import asyncio
import pandas as pd
import logging
from typing import Optional
from typing import List, Optional
from .iceberg_client import IcebergClient
from .history_client import HistoryClient
from .symbol_metadata_client import SymbolMetadataClient
@@ -90,6 +90,7 @@ class OHLCClient:
period_seconds: int,
start_time: int,
end_time: int,
columns: Optional[List[str]] = None,
request_timeout: float = 120.0
) -> pd.DataFrame:
"""
@@ -123,7 +124,7 @@ class OHLCClient:
end_time = ((end_time + period_nanos - 1) // period_nanos) * period_nanos # exclusive
# Step 1: Check Iceberg for existing data (run in thread — scan.to_pandas() blocks ~3-5s)
df = await asyncio.to_thread(self.iceberg.query_ohlc, ticker, period_seconds, start_time, end_time)
df = await asyncio.to_thread(self.iceberg.query_ohlc, ticker, period_seconds, start_time, end_time, columns)
# Step 2: Identify missing ranges — pass df to avoid a redundant Iceberg scan
missing_ranges = self.iceberg.find_missing_ranges(
@@ -149,7 +150,7 @@ class OHLCClient:
raise ValueError(f"Historical data request failed: {result['error_message']}")
# Step 5: Query Iceberg again for complete dataset (run in thread)
df = await asyncio.to_thread(self.iceberg.query_ohlc, ticker, period_seconds, start_time, end_time)
df = await asyncio.to_thread(self.iceberg.query_ohlc, ticker, period_seconds, start_time, end_time, columns)
return self._apply_decimal_correction(ticker, df)