Optimize OHLC queries: run Iceberg scans in threads and reuse DataFrames to avoid redundant scans

This commit is contained in:
2026-04-28 16:36:41 -04:00
parent 47471b7700
commit a0248540e0
2 changed files with 11 additions and 8 deletions

View File

@@ -169,7 +169,8 @@ class IcebergClient:
ticker: str, ticker: str,
period_seconds: int, period_seconds: int,
start_time: int, start_time: int,
end_time: int end_time: int,
df: Optional[pd.DataFrame] = None,
) -> List[Tuple[int, int]]: ) -> List[Tuple[int, int]]:
""" """
Identify missing data ranges in the requested time period. Identify missing data ranges in the requested time period.
@@ -179,10 +180,12 @@ class IcebergClient:
period_seconds: OHLC period in seconds period_seconds: OHLC period in seconds
start_time: Start timestamp in nanoseconds start_time: Start timestamp in nanoseconds
end_time: End timestamp in nanoseconds end_time: End timestamp in nanoseconds
df: Optional pre-fetched DataFrame to avoid a redundant Iceberg scan
Returns: Returns:
List of (start_time, end_time) tuples for missing ranges (nanoseconds) List of (start_time, end_time) tuples for missing ranges (nanoseconds)
""" """
if df is None:
df = self.query_ohlc(ticker, period_seconds, start_time, end_time) df = self.query_ohlc(ticker, period_seconds, start_time, end_time)
if df.empty: if df.empty:

View File

@@ -122,12 +122,12 @@ class OHLCClient:
start_time = ((start_time + period_nanos - 1) // period_nanos) * period_nanos start_time = ((start_time + period_nanos - 1) // period_nanos) * period_nanos
end_time = ((end_time + period_nanos - 1) // period_nanos) * period_nanos # exclusive end_time = ((end_time + period_nanos - 1) // period_nanos) * period_nanos # exclusive
# Step 1: Check Iceberg for existing data # Step 1: Check Iceberg for existing data (run in thread — scan.to_pandas() blocks ~3-5s)
df = self.iceberg.query_ohlc(ticker, period_seconds, start_time, end_time) df = await asyncio.to_thread(self.iceberg.query_ohlc, ticker, period_seconds, start_time, end_time)
# Step 2: Identify missing ranges # Step 2: Identify missing ranges — pass df to avoid a redundant Iceberg scan
missing_ranges = self.iceberg.find_missing_ranges( missing_ranges = self.iceberg.find_missing_ranges(
ticker, period_seconds, start_time, end_time ticker, period_seconds, start_time, end_time, df=df
) )
if not missing_ranges: if not missing_ranges:
@@ -148,8 +148,8 @@ class OHLCClient:
if result['status'] == 'ERROR': if result['status'] == 'ERROR':
raise ValueError(f"Historical data request failed: {result['error_message']}") raise ValueError(f"Historical data request failed: {result['error_message']}")
# Step 5: Query Iceberg again for complete dataset # Step 5: Query Iceberg again for complete dataset (run in thread)
df = self.iceberg.query_ohlc(ticker, period_seconds, start_time, end_time) df = await asyncio.to_thread(self.iceberg.query_ohlc, ticker, period_seconds, start_time, end_time)
return self._apply_decimal_correction(ticker, df) return self._apply_decimal_correction(ticker, df)