research price fix
This commit is contained in:
@@ -8,6 +8,7 @@ import logging
|
|||||||
from typing import Optional
|
from typing import Optional
|
||||||
from .iceberg_client import IcebergClient
|
from .iceberg_client import IcebergClient
|
||||||
from .history_client import HistoryClient
|
from .history_client import HistoryClient
|
||||||
|
from .symbol_metadata_client import SymbolMetadataClient
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -61,6 +62,12 @@ class OHLCClient:
|
|||||||
s3_secret_key=s3_secret_key,
|
s3_secret_key=s3_secret_key,
|
||||||
s3_region=s3_region,
|
s3_region=s3_region,
|
||||||
)
|
)
|
||||||
|
self.symbol_metadata = SymbolMetadataClient(
|
||||||
|
iceberg_catalog_uri, namespace,
|
||||||
|
s3_endpoint=s3_endpoint,
|
||||||
|
s3_access_key=s3_access_key,
|
||||||
|
s3_secret_key=s3_secret_key,
|
||||||
|
)
|
||||||
self.history = HistoryClient(relay_endpoint, notification_endpoint)
|
self.history = HistoryClient(relay_endpoint, notification_endpoint)
|
||||||
log.info("OHLCClient initialized")
|
log.info("OHLCClient initialized")
|
||||||
|
|
||||||
@@ -125,7 +132,7 @@ class OHLCClient:
|
|||||||
|
|
||||||
if not missing_ranges:
|
if not missing_ranges:
|
||||||
# All data exists in Iceberg
|
# All data exists in Iceberg
|
||||||
return df
|
return self._apply_decimal_correction(ticker, df)
|
||||||
|
|
||||||
# Step 3: Request missing data for each range
|
# Step 3: Request missing data for each range
|
||||||
# For simplicity, request entire range (relay can merge adjacent requests)
|
# For simplicity, request entire range (relay can merge adjacent requests)
|
||||||
@@ -144,6 +151,35 @@ class OHLCClient:
|
|||||||
# Step 5: Query Iceberg again for complete dataset
|
# Step 5: Query Iceberg again for complete dataset
|
||||||
df = self.iceberg.query_ohlc(ticker, period_seconds, start_time, end_time)
|
df = self.iceberg.query_ohlc(ticker, period_seconds, start_time, end_time)
|
||||||
|
|
||||||
|
return self._apply_decimal_correction(ticker, df)
|
||||||
|
|
||||||
|
def _apply_decimal_correction(self, ticker: str, df: pd.DataFrame) -> pd.DataFrame:
|
||||||
|
"""
|
||||||
|
Convert raw integer OHLC columns to float prices/volumes.
|
||||||
|
|
||||||
|
Iceberg stores prices and volumes as integers (Nautilus internal units).
|
||||||
|
Divide by 10^price_precision for OHLC columns and 10^size_precision for
|
||||||
|
volume columns to recover actual floating-point values.
|
||||||
|
"""
|
||||||
|
if df.empty:
|
||||||
|
return df
|
||||||
|
|
||||||
|
meta = self.symbol_metadata.get_metadata(ticker)
|
||||||
|
price_precision = meta.price_precision
|
||||||
|
size_precision = meta.size_precision
|
||||||
|
|
||||||
|
if price_precision is not None and price_precision > 0:
|
||||||
|
price_divisor = 10 ** price_precision
|
||||||
|
for col in ("open", "high", "low", "close"):
|
||||||
|
if col in df.columns:
|
||||||
|
df[col] = df[col] / price_divisor
|
||||||
|
|
||||||
|
if size_precision is not None and size_precision > 0:
|
||||||
|
size_divisor = 10 ** size_precision
|
||||||
|
for col in ("volume", "buy_vol", "sell_vol"):
|
||||||
|
if col in df.columns:
|
||||||
|
df[col] = df[col] / size_divisor
|
||||||
|
|
||||||
return df
|
return df
|
||||||
|
|
||||||
async def __aenter__(self):
|
async def __aenter__(self):
|
||||||
|
|||||||
Reference in New Issue
Block a user