Files
ai/sandbox/dexorder/ohlc_client.py

187 lines
6.0 KiB
Python

"""
OHLCClient - High-level API for fetching OHLC data with smart caching
"""
import asyncio
import pandas as pd
import logging
from typing import Optional
from .iceberg_client import IcebergClient
from .history_client import HistoryClient
log = logging.getLogger(__name__)
log = logging.getLogger(__name__)
class OHLCClient:
"""
High-level client for fetching OHLC data.
Workflow:
1. Check Iceberg for existing data
2. Identify missing ranges
3. Request missing data via relay
4. Wait for notification
5. Query Iceberg for complete dataset
6. Return combined results
This provides transparent caching - clients don't need to know
whether data came from cache or was fetched on-demand.
"""
def __init__(
self,
iceberg_catalog_uri: str,
relay_endpoint: str,
notification_endpoint: str,
namespace: str = "trading",
s3_endpoint: str = None,
s3_access_key: str = None,
s3_secret_key: str = None,
):
"""
Initialize OHLC client.
Args:
iceberg_catalog_uri: URI of Iceberg catalog
relay_endpoint: ZMQ endpoint for relay requests
notification_endpoint: ZMQ endpoint for notifications
namespace: Iceberg namespace (default: "trading")
s3_endpoint: S3/MinIO endpoint URL (e.g., "http://localhost:9000")
s3_access_key: S3/MinIO access key
s3_secret_key: S3/MinIO secret key
"""
self.iceberg = IcebergClient(
iceberg_catalog_uri, namespace,
s3_endpoint=s3_endpoint,
s3_access_key=s3_access_key,
s3_secret_key=s3_secret_key,
)
self.history = HistoryClient(relay_endpoint, notification_endpoint)
log.info("OHLCClient initialized")
async def start(self):
"""
Start the client. Must be called before making requests.
Starts background notification listener.
"""
await self.history.connect()
async def stop(self):
"""
Stop the client and cleanup resources.
"""
await self.history.close()
async def fetch_ohlc(
self,
ticker: str,
period_seconds: int,
start_time: int,
end_time: int,
request_timeout: float = 30.0
) -> pd.DataFrame:
"""
Fetch OHLC data with smart caching.
Steps:
1. Query Iceberg for existing data
2. If complete, return immediately
3. If missing data, request via relay
4. Wait for completion notification
5. Query Iceberg again for complete dataset
6. Return results
Args:
ticker: Market identifier in Nautilus format (e.g., "BTC/USDT.BINANCE")
period_seconds: OHLC period in seconds (60, 300, 3600, etc.)
start_time: Start timestamp in nanoseconds
end_time: End timestamp in nanoseconds
request_timeout: Timeout for historical data requests (default: 30s)
Returns:
DataFrame with OHLC data sorted by timestamp
Raises:
TimeoutError: If historical data request times out
ValueError: If request fails
"""
# Align times to period boundaries: [ceil(start), ceil(end)) exclusive
period_nanos = period_seconds * 1_000_000_000
start_time = ((start_time + period_nanos - 1) // period_nanos) * period_nanos
end_time = ((end_time + period_nanos - 1) // period_nanos) * period_nanos # exclusive
# Step 1: Check Iceberg for existing data
df = self.iceberg.query_ohlc(ticker, period_seconds, start_time, end_time)
# Step 2: Identify missing ranges
missing_ranges = self.iceberg.find_missing_ranges(
ticker, period_seconds, start_time, end_time
)
if not missing_ranges:
# All data exists in Iceberg
return self._forward_fill_gaps(df, period_seconds)
# Step 3: Request missing data for each range
# For simplicity, request entire range (relay can merge adjacent requests)
result = await self.history.request_historical_ohlc(
ticker=ticker,
period_seconds=period_seconds,
start_time=start_time,
end_time=end_time,
timeout=request_timeout
)
# Step 4: Check result status
if result['status'] == 'ERROR':
raise ValueError(f"Historical data request failed: {result['error_message']}")
# Step 5: Query Iceberg again for complete dataset
df = self.iceberg.query_ohlc(ticker, period_seconds, start_time, end_time)
return self._forward_fill_gaps(df, period_seconds)
def _forward_fill_gaps(self, df: pd.DataFrame, period_seconds: int) -> pd.DataFrame:
"""
Forward-fill interior missing bars by carrying the last known close into
open, high, low, and close of any gap bar.
Only interior gaps (rows already present with null OHLC from the ingestor,
or timestamp slots missing between real bars) are filled. Edge gaps (before
the first real bar or after the last real bar) are left as-is.
"""
if df.empty:
return df
df = df.sort_index()
# Identify rows that are gap bars (null close)
is_gap = df['close'].isna()
if not is_gap.any():
return df
# Forward-fill close across gap rows, then copy into open/high/low
df['close'] = df['close'].ffill()
price_cols = ['open', 'high', 'low']
for col in price_cols:
if col in df.columns:
df[col] = df[col].where(~is_gap, df['close'])
# Zero out volume for filled gap rows
if 'volume' in df.columns:
df['volume'] = df['volume'].where(~is_gap, 0.0)
return df
async def __aenter__(self):
"""Support async context manager."""
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Support async context manager."""
await self.stop()