From a0248540e0e30c4d169f7f43e96849520e47b2fb Mon Sep 17 00:00:00 2001 From: Tim Olson Date: Tue, 28 Apr 2026 16:36:41 -0400 Subject: [PATCH] Optimize OHLC queries: run Iceberg scans in threads and reuse DataFrames to avoid redundant scans --- sandbox/dexorder/iceberg_client.py | 7 +++++-- sandbox/dexorder/ohlc_client.py | 12 ++++++------ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/sandbox/dexorder/iceberg_client.py b/sandbox/dexorder/iceberg_client.py index 6c762d4d..86784329 100644 --- a/sandbox/dexorder/iceberg_client.py +++ b/sandbox/dexorder/iceberg_client.py @@ -169,7 +169,8 @@ class IcebergClient: ticker: str, period_seconds: int, start_time: int, - end_time: int + end_time: int, + df: Optional[pd.DataFrame] = None, ) -> List[Tuple[int, int]]: """ Identify missing data ranges in the requested time period. @@ -179,11 +180,13 @@ class IcebergClient: period_seconds: OHLC period in seconds start_time: Start timestamp in nanoseconds end_time: End timestamp in nanoseconds + df: Optional pre-fetched DataFrame to avoid a redundant Iceberg scan Returns: List of (start_time, end_time) tuples for missing ranges (nanoseconds) """ - df = self.query_ohlc(ticker, period_seconds, start_time, end_time) + if df is None: + df = self.query_ohlc(ticker, period_seconds, start_time, end_time) if df.empty: return [(start_time, end_time)] diff --git a/sandbox/dexorder/ohlc_client.py b/sandbox/dexorder/ohlc_client.py index 84f98811..95516fd9 100644 --- a/sandbox/dexorder/ohlc_client.py +++ b/sandbox/dexorder/ohlc_client.py @@ -122,12 +122,12 @@ class OHLCClient: start_time = ((start_time + period_nanos - 1) // period_nanos) * period_nanos end_time = ((end_time + period_nanos - 1) // period_nanos) * period_nanos # exclusive - # Step 1: Check Iceberg for existing data - df = self.iceberg.query_ohlc(ticker, period_seconds, start_time, end_time) + # Step 1: Check Iceberg for existing data (run in thread — scan.to_pandas() blocks ~3-5s) + df = await asyncio.to_thread(self.iceberg.query_ohlc, ticker, period_seconds, start_time, end_time) - # Step 2: Identify missing ranges + # Step 2: Identify missing ranges — pass df to avoid a redundant Iceberg scan missing_ranges = self.iceberg.find_missing_ranges( - ticker, period_seconds, start_time, end_time + ticker, period_seconds, start_time, end_time, df=df ) if not missing_ranges: @@ -148,8 +148,8 @@ class OHLCClient: if result['status'] == 'ERROR': raise ValueError(f"Historical data request failed: {result['error_message']}") - # Step 5: Query Iceberg again for complete dataset - df = self.iceberg.query_ohlc(ticker, period_seconds, start_time, end_time) + # Step 5: Query Iceberg again for complete dataset (run in thread) + df = await asyncio.to_thread(self.iceberg.query_ohlc, ticker, period_seconds, start_time, end_time) return self._apply_decimal_correction(ticker, df)