diff --git a/deploy/k8s/dev/kustomization.yaml b/deploy/k8s/dev/kustomization.yaml index 538a8586..2bf66656 100644 --- a/deploy/k8s/dev/kustomization.yaml +++ b/deploy/k8s/dev/kustomization.yaml @@ -243,6 +243,15 @@ generatorOptions: + + + + + + + + + diff --git a/doc/architecture.md b/doc/architecture.md index 69708c09..e7af7f7a 100644 --- a/doc/architecture.md +++ b/doc/architecture.md @@ -197,7 +197,7 @@ See [[protocol]] for detailed ZMQ patterns and message formats. **Features:** - CCXT-based exchange adapters -- Subscribes to work queue via exchange prefix (e.g., `BINANCE:`) +- Subscribes to work queue via exchange suffix (e.g., `.BINANCE`) - Writes raw data to Kafka only (no direct client responses) - Supports realtime ticks and historical OHLC @@ -324,7 +324,7 @@ See [[protocol#Historical Data Query Flow]] for details. 4. Relay → Clients (XPUB/SUB): Fanout to subscribers ``` -**Topic Format:** `{ticker}|{data_type}` (e.g., `BINANCE:BTC/USDT|tick`) +**Topic Format:** `{ticker}|{data_type}` (e.g., `BTC/USDT.BINANCE|tick`) --- diff --git a/doc/protocol.md b/doc/protocol.md index 606d58be..5b66b6f1 100644 --- a/doc/protocol.md +++ b/doc/protocol.md @@ -85,7 +85,7 @@ All sockets bind on **Relay** (well-known endpoint). Components connect to relay - **Socket Type**: Relay uses PUB (bind), Ingestors use SUB (connect) - **Endpoint**: `tcp://*:5555` (Relay binds) - **Message Types**: `DataRequest` (historical or realtime) -- **Topic Prefix**: Exchange name (e.g., `BINANCE:`, `COINBASE:`) +- **Topic Prefix**: Market name (e.g., `BTC/USDT.`, `ETH/BTC.`) - **Behavior**: - Relay publishes work with exchange prefix from ticker - Ingestors subscribe only to exchanges they support @@ -100,7 +100,7 @@ All sockets bind on **Relay** (well-known endpoint). Components connect to relay - Relay XSUB (connect) → Flink PUB (bind) - Port 5557 - **Message Types**: `Tick`, `OHLC`, `HistoryReadyNotification`, `SymbolMetadataUpdated` - **Topic Formats**: - - Market data: `{ticker}|{data_type}` (e.g., `BINANCE:BTC/USDT|tick`) + - Market data: `{ticker}|{data_type}` (e.g., `BTC/USDT.BINANCE|tick`) - Notifications: `RESPONSE:{client_id}` or `HISTORY_READY:{request_id}` - System notifications: `METADATA_UPDATE` (for symbol metadata updates) - **Behavior**: diff --git a/doc/user_mcp_resources.md b/doc/user_mcp_resources.md index d507466f..8dcfa820 100644 --- a/doc/user_mcp_resources.md +++ b/doc/user_mcp_resources.md @@ -105,7 +105,7 @@ async def get_conversation_summary() -> str: ```json { "currentChart": { - "ticker": "BINANCE:BTC/USDT", + "ticker": "BTC/USDT.BINANCE", "timeframe": "1h", "indicators": ["SMA(20)", "RSI(14)", "MACD(12,26,9)"] }, diff --git a/flink/src/main/java/com/dexorder/flink/iceberg/SchemaInitializer.java b/flink/src/main/java/com/dexorder/flink/iceberg/SchemaInitializer.java index bf600c38..bd4ce8b5 100644 --- a/flink/src/main/java/com/dexorder/flink/iceberg/SchemaInitializer.java +++ b/flink/src/main/java/com/dexorder/flink/iceberg/SchemaInitializer.java @@ -81,7 +81,8 @@ public class SchemaInitializer { // Bump this when the schema changes. Tables with a different (or missing) version // will be dropped and recreated. Increment by 1 for each incompatible change. // v2: open/high/low/close changed from required to optional to support null gap bars - private static final String OHLC_SCHEMA_VERSION = "2"; + // v3: timestamps changed from microseconds to nanoseconds; ticker format changed to BTC/USDT.BINANCE + private static final String OHLC_SCHEMA_VERSION = "3"; private static final String SCHEMA_VERSION_PROP = "app.schema.version"; private void initializeOhlcTable() { @@ -124,9 +125,9 @@ public class SchemaInitializer { // so that GenericRowData.setField() accepts a plain Long value. Schema schema = new Schema( // Primary key fields - required(1, "ticker", Types.StringType.get(), "Market identifier (e.g., BINANCE:BTC/USDT)"), + required(1, "ticker", Types.StringType.get(), "Market identifier in Nautilus format (e.g., BTC/USDT.BINANCE)"), required(2, "period_seconds", Types.IntegerType.get(), "OHLC period in seconds"), - required(3, "timestamp", Types.LongType.get(), "Candle timestamp in microseconds since epoch"), + required(3, "timestamp", Types.LongType.get(), "Candle timestamp in nanoseconds since epoch"), // OHLC price data — optional to support gap bars (null = no trades that period) optional(4, "open", Types.LongType.get(), "Opening price"), @@ -150,7 +151,7 @@ public class SchemaInitializer { // Metadata fields optional(16, "request_id", Types.StringType.get(), "Request ID that generated this data"), - required(17, "ingested_at", Types.LongType.get(), "Timestamp when data was ingested by Flink") + required(17, "ingested_at", Types.LongType.get(), "Timestamp when data was ingested by Flink (nanoseconds since epoch)") ); // Create the table with partitioning and properties @@ -176,7 +177,10 @@ public class SchemaInitializer { /** * Initialize the symbol_metadata table if it doesn't exist. */ - private static final String SYMBOL_METADATA_SCHEMA_VERSION = "1"; + // v2: removed tick_denom/base_denom/quote_denom; added Nautilus instrument fields + // (price_precision, size_precision, tick_size, lot_size, min_notional, + // margin_init, margin_maint, maker_fee, taker_fee, contract_multiplier) + private static final String SYMBOL_METADATA_SCHEMA_VERSION = "2"; private void initializeSymbolMetadataTable() { TableIdentifier tableId = TableIdentifier.of(namespace, "symbol_metadata"); @@ -220,24 +224,31 @@ public class SchemaInitializer { required(2, "market_id", Types.StringType.get(), "Market symbol (e.g., BTC/USDT)"), // Market information - optional(3, "market_type", Types.StringType.get(), "Market type (spot, futures, swap)"), + optional(3, "market_type", Types.StringType.get(), "Market type (spot, futures, swap, CryptoPerpetual)"), optional(4, "description", Types.StringType.get(), "Human-readable description"), optional(5, "base_asset", Types.StringType.get(), "Base asset (e.g., BTC)"), optional(6, "quote_asset", Types.StringType.get(), "Quote asset (e.g., USDT)"), - // Precision/denominator information - optional(7, "tick_denom", Types.LongType.get(), "Tick price denominator (10^n for n decimals)"), - optional(8, "base_denom", Types.LongType.get(), "Base asset denominator"), - optional(9, "quote_denom", Types.LongType.get(), "Quote asset denominator"), - // Supported timeframes optional(10, "supported_period_seconds", Types.ListType.ofRequired(11, Types.IntegerType.get()), "Supported OHLC periods in seconds"), // Optional timing information - optional(12, "earliest_time", Types.LongType.get(), "Earliest available data timestamp (microseconds)"), + optional(12, "earliest_time", Types.LongType.get(), "Earliest available data timestamp (nanoseconds since epoch)"), // Metadata - required(13, "updated_at", Types.LongType.get(), "Timestamp when metadata was last updated (microseconds)") + required(13, "updated_at", Types.LongType.get(), "Timestamp when metadata was last updated (nanoseconds since epoch)"), + + // Nautilus Instrument fields — required for constructing Instrument objects in the sandbox bridge + optional(14, "price_precision", Types.IntegerType.get(), "Decimal places for prices (e.g., 2 means $0.01 resolution)"), + optional(15, "size_precision", Types.IntegerType.get(), "Decimal places for quantities"), + optional(16, "tick_size", Types.DoubleType.get(), "Minimum price increment (e.g., 0.01)"), + optional(17, "lot_size", Types.DoubleType.get(), "Minimum order size"), + optional(18, "min_notional", Types.DoubleType.get(), "Minimum order value in quote currency"), + optional(19, "margin_init", Types.DoubleType.get(), "Initial margin requirement (futures/perps only)"), + optional(20, "margin_maint", Types.DoubleType.get(), "Maintenance margin (futures/perps only)"), + optional(21, "maker_fee", Types.DoubleType.get(), "Maker fee rate (e.g., 0.001 = 0.1%)"), + optional(22, "taker_fee", Types.DoubleType.get(), "Taker fee rate"), + optional(23, "contract_multiplier", Types.DoubleType.get(), "Contract multiplier for derivatives (default 1.0)") ); // Create the table with partitioning and properties diff --git a/flink/src/main/java/com/dexorder/flink/ingestor/IngestorWorkQueue.java b/flink/src/main/java/com/dexorder/flink/ingestor/IngestorWorkQueue.java index d8e1df97..8c526dfb 100644 --- a/flink/src/main/java/com/dexorder/flink/ingestor/IngestorWorkQueue.java +++ b/flink/src/main/java/com/dexorder/flink/ingestor/IngestorWorkQueue.java @@ -108,13 +108,13 @@ public class IngestorWorkQueue { /** * Send a data request to ingestors via PUB socket with exchange prefix. - * The topic prefix is extracted from the ticker (e.g., "BINANCE:BTC/USDT" -> "BINANCE:") + * The topic prefix is extracted from the ticker (e.g., "BTC/USDT.BINANCE" -> "BINANCE:") */ private void sendToIngestors(DataRequestMessage request) { try { byte[] protobufData = request.toProtobuf(); - // Extract exchange prefix from ticker (e.g., "BINANCE:BTC/USDT" -> "BINANCE:") + // Extract exchange prefix from ticker (e.g., "BTC/USDT.BINANCE" -> "BINANCE:") String ticker = request.getTicker(); String exchangePrefix = extractExchangePrefix(ticker); @@ -143,7 +143,7 @@ public class IngestorWorkQueue { /** * Extract exchange prefix from ticker string. - * E.g., "BINANCE:BTC/USDT" -> "BINANCE:" + * E.g., "BTC/USDT.BINANCE" -> "BINANCE:" */ private String extractExchangePrefix(String ticker) { int colonIndex = ticker.indexOf(':'); diff --git a/flink/src/main/java/com/dexorder/flink/publisher/HistoryNotificationFunction.java b/flink/src/main/java/com/dexorder/flink/publisher/HistoryNotificationFunction.java index 0a356cd7..ae893ce8 100644 --- a/flink/src/main/java/com/dexorder/flink/publisher/HistoryNotificationFunction.java +++ b/flink/src/main/java/com/dexorder/flink/publisher/HistoryNotificationFunction.java @@ -119,7 +119,7 @@ public class HistoryNotificationFunction extends ProcessFunction "binance") + // Extract exchange from ticker (e.g., "BTC/USDT.BINANCE" -> "binance") String exchange = ticker.split(":")[0].toLowerCase(); // Convert period to human-readable format diff --git a/flink/src/main/java/com/dexorder/flink/publisher/MarketDeserializer.java b/flink/src/main/java/com/dexorder/flink/publisher/MarketDeserializer.java index 37bc5fec..07dd48c6 100644 --- a/flink/src/main/java/com/dexorder/flink/publisher/MarketDeserializer.java +++ b/flink/src/main/java/com/dexorder/flink/publisher/MarketDeserializer.java @@ -62,9 +62,6 @@ public class MarketDeserializer implements DeserializationSchema wrapper.setDescription(market.getDescription()); wrapper.setBaseAsset(market.getBaseAsset()); wrapper.setQuoteAsset(market.getQuoteAsset()); - wrapper.setTickDenom(market.getTickDenom()); - wrapper.setBaseDenom(market.getBaseDenom()); - wrapper.setQuoteDenom(market.getQuoteDenom()); // Convert repeated field to List List supportedPeriods = new ArrayList<>(market.getSupportedPeriodSecondsList()); @@ -72,6 +69,18 @@ public class MarketDeserializer implements DeserializationSchema wrapper.setEarliestTime(market.getEarliestTime()); + // Nautilus Instrument fields + wrapper.setPricePrecision(market.getPricePrecision()); + wrapper.setSizePrecision(market.getSizePrecision()); + wrapper.setTickSize(market.getTickSize()); + wrapper.setLotSize(market.getLotSize()); + wrapper.setMinNotional(market.getMinNotional()); + wrapper.setMarginInit(market.getMarginInit()); + wrapper.setMarginMaint(market.getMarginMaint()); + wrapper.setMakerFee(market.getMakerFee()); + wrapper.setTakerFee(market.getTakerFee()); + wrapper.setContractMultiplier(market.getContractMultiplier()); + return wrapper; } catch (Exception e) { LOG.error("Failed to deserialize Market protobuf", e); diff --git a/flink/src/main/java/com/dexorder/flink/publisher/MarketWrapper.java b/flink/src/main/java/com/dexorder/flink/publisher/MarketWrapper.java index 530296d3..ae71942f 100644 --- a/flink/src/main/java/com/dexorder/flink/publisher/MarketWrapper.java +++ b/flink/src/main/java/com/dexorder/flink/publisher/MarketWrapper.java @@ -8,7 +8,7 @@ import java.util.List; * Represents symbol metadata for a trading pair. */ public class MarketWrapper implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private String exchangeId; private String marketId; @@ -16,119 +16,79 @@ public class MarketWrapper implements Serializable { private String description; private String baseAsset; private String quoteAsset; - private long tickDenom; - private long baseDenom; - private long quoteDenom; private List supportedPeriodSeconds; private long earliestTime; + // Nautilus Instrument fields + private int pricePrecision; + private int sizePrecision; + private double tickSize; + private double lotSize; + private double minNotional; + private double marginInit; + private double marginMaint; + private double makerFee; + private double takerFee; + private double contractMultiplier; + public MarketWrapper() { } - public MarketWrapper(String exchangeId, String marketId, String marketType, String description, - String baseAsset, String quoteAsset, long tickDenom, long baseDenom, - long quoteDenom, List supportedPeriodSeconds, long earliestTime) { - this.exchangeId = exchangeId; - this.marketId = marketId; - this.marketType = marketType; - this.description = description; - this.baseAsset = baseAsset; - this.quoteAsset = quoteAsset; - this.tickDenom = tickDenom; - this.baseDenom = baseDenom; - this.quoteDenom = quoteDenom; - this.supportedPeriodSeconds = supportedPeriodSeconds; - this.earliestTime = earliestTime; - } - // Getters and setters - public String getExchangeId() { - return exchangeId; - } - public void setExchangeId(String exchangeId) { - this.exchangeId = exchangeId; - } + public String getExchangeId() { return exchangeId; } + public void setExchangeId(String exchangeId) { this.exchangeId = exchangeId; } - public String getMarketId() { - return marketId; - } + public String getMarketId() { return marketId; } + public void setMarketId(String marketId) { this.marketId = marketId; } - public void setMarketId(String marketId) { - this.marketId = marketId; - } + public String getMarketType() { return marketType; } + public void setMarketType(String marketType) { this.marketType = marketType; } - public String getMarketType() { - return marketType; - } + public String getDescription() { return description; } + public void setDescription(String description) { this.description = description; } - public void setMarketType(String marketType) { - this.marketType = marketType; - } + public String getBaseAsset() { return baseAsset; } + public void setBaseAsset(String baseAsset) { this.baseAsset = baseAsset; } - public String getDescription() { - return description; - } + public String getQuoteAsset() { return quoteAsset; } + public void setQuoteAsset(String quoteAsset) { this.quoteAsset = quoteAsset; } - public void setDescription(String description) { - this.description = description; - } + public List getSupportedPeriodSeconds() { return supportedPeriodSeconds; } + public void setSupportedPeriodSeconds(List supportedPeriodSeconds) { this.supportedPeriodSeconds = supportedPeriodSeconds; } - public String getBaseAsset() { - return baseAsset; - } + public long getEarliestTime() { return earliestTime; } + public void setEarliestTime(long earliestTime) { this.earliestTime = earliestTime; } - public void setBaseAsset(String baseAsset) { - this.baseAsset = baseAsset; - } + public int getPricePrecision() { return pricePrecision; } + public void setPricePrecision(int pricePrecision) { this.pricePrecision = pricePrecision; } - public String getQuoteAsset() { - return quoteAsset; - } + public int getSizePrecision() { return sizePrecision; } + public void setSizePrecision(int sizePrecision) { this.sizePrecision = sizePrecision; } - public void setQuoteAsset(String quoteAsset) { - this.quoteAsset = quoteAsset; - } + public double getTickSize() { return tickSize; } + public void setTickSize(double tickSize) { this.tickSize = tickSize; } - public long getTickDenom() { - return tickDenom; - } + public double getLotSize() { return lotSize; } + public void setLotSize(double lotSize) { this.lotSize = lotSize; } - public void setTickDenom(long tickDenom) { - this.tickDenom = tickDenom; - } + public double getMinNotional() { return minNotional; } + public void setMinNotional(double minNotional) { this.minNotional = minNotional; } - public long getBaseDenom() { - return baseDenom; - } + public double getMarginInit() { return marginInit; } + public void setMarginInit(double marginInit) { this.marginInit = marginInit; } - public void setBaseDenom(long baseDenom) { - this.baseDenom = baseDenom; - } + public double getMarginMaint() { return marginMaint; } + public void setMarginMaint(double marginMaint) { this.marginMaint = marginMaint; } - public long getQuoteDenom() { - return quoteDenom; - } + public double getMakerFee() { return makerFee; } + public void setMakerFee(double makerFee) { this.makerFee = makerFee; } - public void setQuoteDenom(long quoteDenom) { - this.quoteDenom = quoteDenom; - } + public double getTakerFee() { return takerFee; } + public void setTakerFee(double takerFee) { this.takerFee = takerFee; } - public List getSupportedPeriodSeconds() { - return supportedPeriodSeconds; - } - - public void setSupportedPeriodSeconds(List supportedPeriodSeconds) { - this.supportedPeriodSeconds = supportedPeriodSeconds; - } - - public long getEarliestTime() { - return earliestTime; - } - - public void setEarliestTime(long earliestTime) { - this.earliestTime = earliestTime; - } + public double getContractMultiplier() { return contractMultiplier; } + public void setContractMultiplier(double contractMultiplier) { this.contractMultiplier = contractMultiplier; } @Override public String toString() { diff --git a/flink/src/main/java/com/dexorder/flink/sink/IcebergOHLCSink.java b/flink/src/main/java/com/dexorder/flink/sink/IcebergOHLCSink.java index bf980e8d..882301c4 100644 --- a/flink/src/main/java/com/dexorder/flink/sink/IcebergOHLCSink.java +++ b/flink/src/main/java/com/dexorder/flink/sink/IcebergOHLCSink.java @@ -75,7 +75,7 @@ public class IcebergOHLCSink { String requestId = batch.getRequestId(); String ticker = batch.getTicker(); int periodSeconds = batch.getPeriodSeconds(); - long ingestedAt = System.currentTimeMillis() * 1000; + long ingestedAt = System.currentTimeMillis() * 1_000_000L; // nanoseconds // Emit one RowData for each OHLC row in the batch for (OHLCBatchWrapper.OHLCRow row : batch.getRows()) { diff --git a/flink/src/main/java/com/dexorder/flink/sink/SymbolMetadataWriter.java b/flink/src/main/java/com/dexorder/flink/sink/SymbolMetadataWriter.java index 15e9ccb2..cd3860f9 100644 --- a/flink/src/main/java/com/dexorder/flink/sink/SymbolMetadataWriter.java +++ b/flink/src/main/java/com/dexorder/flink/sink/SymbolMetadataWriter.java @@ -87,8 +87,8 @@ public class SymbolMetadataWriter extends RichFlatMapFunction out) throws Exception { - // Create unique key for deduplication - String symbolKey = market.getExchangeId() + ":" + market.getMarketId(); + // Create unique key for deduplication (internal key, not stored) + String symbolKey = market.getMarketId() + "." + market.getExchangeId(); // Skip if we've already seen this symbol if (seenSymbols.contains(symbolKey)) { @@ -110,16 +110,25 @@ public class SymbolMetadataWriter extends RichFlatMapFunction List supportedPeriods = new ArrayList<>(market.getSupportedPeriodSeconds()); record.setField("supported_period_seconds", supportedPeriods); record.setField("earliest_time", market.getEarliestTime() != 0 ? market.getEarliestTime() : null); - record.setField("updated_at", System.currentTimeMillis() * 1000); // Current time in microseconds + record.setField("updated_at", System.currentTimeMillis() * 1_000_000L); // Current time in nanoseconds + + // Nautilus Instrument fields (populated from exchange API data) + record.setField("price_precision", market.getPricePrecision() != 0 ? market.getPricePrecision() : null); + record.setField("size_precision", market.getSizePrecision() != 0 ? market.getSizePrecision() : null); + record.setField("tick_size", market.getTickSize() != 0.0 ? market.getTickSize() : null); + record.setField("lot_size", market.getLotSize() != 0.0 ? market.getLotSize() : null); + record.setField("min_notional", market.getMinNotional() != 0.0 ? market.getMinNotional() : null); + record.setField("margin_init", market.getMarginInit() != 0.0 ? market.getMarginInit() : null); + record.setField("margin_maint", market.getMarginMaint() != 0.0 ? market.getMarginMaint() : null); + record.setField("maker_fee", market.getMakerFee() != 0.0 ? market.getMakerFee() : null); + record.setField("taker_fee", market.getTakerFee() != 0.0 ? market.getTakerFee() : null); + record.setField("contract_multiplier", market.getContractMultiplier() != 0.0 ? market.getContractMultiplier() : null); // Get or create writer for this exchange DataWriter writer = writersByExchange.get(exchangeId); diff --git a/flink/src/main/resources/iceberg-schemas/ohlc_schema.sql b/flink/src/main/resources/iceberg-schemas/ohlc_schema.sql index 3acd6ca1..b55e48b2 100644 --- a/flink/src/main/resources/iceberg-schemas/ohlc_schema.sql +++ b/flink/src/main/resources/iceberg-schemas/ohlc_schema.sql @@ -9,7 +9,7 @@ CREATE TABLE IF NOT EXISTS trading.ohlc ( -- Primary key fields - ticker STRING NOT NULL COMMENT 'Market identifier (e.g., BINANCE:BTC/USDT)', + ticker STRING NOT NULL COMMENT 'Market identifier (e.g., BTC/USDT.BINANCE)', period_seconds INT NOT NULL COMMENT 'OHLC period in seconds (60, 300, 900, 3600, 14400, 86400, 604800, etc.)', timestamp BIGINT NOT NULL COMMENT 'Candle timestamp in microseconds since epoch', diff --git a/gateway/src/clients/duckdb-client.ts b/gateway/src/clients/duckdb-client.ts index 4194cd43..4aa3c8a4 100644 --- a/gateway/src/clients/duckdb-client.ts +++ b/gateway/src/clients/duckdb-client.ts @@ -388,8 +388,8 @@ export class DuckDBClient { async queryOHLC( ticker: string, period_seconds: number, - start_time: bigint, // microseconds - end_time: bigint // microseconds + start_time: bigint, // nanoseconds + end_time: bigint // nanoseconds ): Promise { await this.initialize(); diff --git a/gateway/src/clients/iceberg-client.ts b/gateway/src/clients/iceberg-client.ts index 4a87eb68..bb4a1442 100644 --- a/gateway/src/clients/iceberg-client.ts +++ b/gateway/src/clients/iceberg-client.ts @@ -42,7 +42,7 @@ export interface IcebergMessage { role: 'user' | 'assistant' | 'system' | 'workspace'; content: string; metadata: string; // JSON string - timestamp: number; // microseconds + timestamp: number; // nanoseconds } /** @@ -54,7 +54,7 @@ export interface IcebergCheckpoint { checkpoint_id: string; checkpoint_data: string; // JSON string metadata: string; // JSON string - timestamp: number; // microseconds + timestamp: number; // nanoseconds } /** @@ -213,8 +213,8 @@ export class IcebergClient { async queryOHLC( ticker: string, period_seconds: number, - start_time: bigint, // microseconds - end_time: bigint // microseconds + start_time: bigint, // nanoseconds + end_time: bigint // nanoseconds ): Promise { return this.duckdb.queryOHLC(ticker, period_seconds, start_time, end_time); } diff --git a/gateway/src/clients/zmq-relay-client.ts b/gateway/src/clients/zmq-relay-client.ts index a7870f49..5722d3ec 100644 --- a/gateway/src/clients/zmq-relay-client.ts +++ b/gateway/src/clients/zmq-relay-client.ts @@ -124,7 +124,7 @@ export class ZMQRelayClient { * * IMPORTANT: Call connect() before using this method. * - * @param ticker Market identifier (e.g., "BINANCE:BTC/USDT") + * @param ticker Market identifier (e.g., "BTC/USDT.BINANCE") * @param period_seconds OHLC period in seconds * @param start_time Start timestamp in MICROSECONDS * @param end_time End timestamp in MICROSECONDS diff --git a/gateway/src/harness/agent-harness.ts b/gateway/src/harness/agent-harness.ts index bb284377..c2bfd2d4 100644 --- a/gateway/src/harness/agent-harness.ts +++ b/gateway/src/harness/agent-harness.ts @@ -588,7 +588,7 @@ export class AgentHarness { const labels: Record = { research: 'Researching...', get_chart_data: 'Fetching chart data...', - symbol_lookup: 'Looking up symbol...', + symbol_lookup: 'Searching symbol...', category_list: 'Seeing what we have...', category_edit: 'Coding...', category_write: 'Coding...', diff --git a/gateway/src/harness/subagents/research/memory/api-reference.md b/gateway/src/harness/subagents/research/memory/api-reference.md index e6accba1..627ff1a4 100644 --- a/gateway/src/harness/subagents/research/memory/api-reference.md +++ b/gateway/src/harness/subagents/research/memory/api-reference.md @@ -60,7 +60,7 @@ class API: # Fetch data df = asyncio.run(api.data.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time="2021-12-20", end_time="2021-12-21" @@ -107,8 +107,8 @@ class DataAPI(ABC): Fetch historical OHLC candlestick data for a market. Args: - ticker: Market identifier in format "EXCHANGE:SYMBOL" - Examples: "BINANCE:BTC/USDT", "COINBASE:ETH/USD" + ticker: Market identifier in format "MARKET.EXCHANGE" + Examples: "BTC/USDT.BINANCE", "ETH/USD.COINBASE" period_seconds: Candle period in seconds Common values: - 60 (1 minute) @@ -135,7 +135,7 @@ class DataAPI(ABC): Returns: DataFrame with candlestick data sorted by timestamp (ascending). Standard columns (always included): - - timestamp: Period start time in microseconds + - timestamp: Period start time in nanoseconds - open: Opening price (decimal float) - high: Highest price (decimal float) - low: Lowest price (decimal float) @@ -151,7 +151,7 @@ class DataAPI(ABC): Examples: # Basic OHLC with Unix timestamp df = await api.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time=1640000000, end_time=1640086400 @@ -159,7 +159,7 @@ class DataAPI(ABC): # Using date strings with volume df = await api.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time="2021-12-20", end_time="2021-12-21", @@ -169,7 +169,7 @@ class DataAPI(ABC): # Using datetime objects from datetime import datetime df = await api.historical_ohlc( - ticker="COINBASE:ETH/USD", + ticker="ETH/USD.COINBASE", period_seconds=300, start_time=datetime(2021, 12, 20, 9, 30), end_time=datetime(2021, 12, 20, 16, 30), @@ -193,8 +193,8 @@ class DataAPI(ABC): specify exact timestamps. Useful for real-time analysis and indicators. Args: - ticker: Market identifier in format "EXCHANGE:SYMBOL" - Examples: "BINANCE:BTC/USDT", "COINBASE:ETH/USD" + ticker: Market identifier in format "MARKET.EXCHANGE" + Examples: "BTC/USDT.BINANCE", "ETH/USD.COINBASE" period_seconds: OHLC candle period in seconds Common values: 60 (1m), 300 (5m), 900 (15m), 3600 (1h), 86400 (1d), 604800 (1w) @@ -213,14 +213,14 @@ class DataAPI(ABC): Examples: # Get the last candle df = await api.latest_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600 ) # Returns: timestamp, open, high, low, close # Get the last 50 5-minute candles with volume df = await api.latest_ohlc( - ticker="COINBASE:ETH/USD", + ticker="ETH/USD.COINBASE", period_seconds=300, length=50, extra_columns=["volume", "buy_vol", "sell_vol"] @@ -228,7 +228,7 @@ class DataAPI(ABC): # Get recent candles with all timing data df = await api.latest_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=60, length=100, extra_columns=["open_time", "high_time", "low_time", "close_time"] @@ -451,7 +451,7 @@ def get_api() -> API: # Fetch data df = asyncio.run(api.data.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time="2021-12-20", end_time="2021-12-21" diff --git a/gateway/src/harness/subagents/research/memory/pandas-ta-reference.md b/gateway/src/harness/subagents/research/memory/pandas-ta-reference.md index c38d0eeb..94241b31 100644 --- a/gateway/src/harness/subagents/research/memory/pandas-ta-reference.md +++ b/gateway/src/harness/subagents/research/memory/pandas-ta-reference.md @@ -198,7 +198,7 @@ import asyncio api = get_api() df = asyncio.run(api.data.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time="2024-01-01", end_time="2024-01-08", diff --git a/gateway/src/harness/subagents/research/memory/usage-examples.md b/gateway/src/harness/subagents/research/memory/usage-examples.md index 45a6e86d..72f0e19d 100644 --- a/gateway/src/harness/subagents/research/memory/usage-examples.md +++ b/gateway/src/harness/subagents/research/memory/usage-examples.md @@ -29,7 +29,7 @@ api = get_api() # Method 1: Using Unix timestamps (seconds) df = asyncio.run(api.data.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, # 1 hour candles start_time=1640000000, # Unix timestamp in seconds end_time=1640086400, @@ -38,7 +38,7 @@ df = asyncio.run(api.data.historical_ohlc( # Method 2: Using date strings df = asyncio.run(api.data.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time="2021-12-20", # Simple date string end_time="2021-12-21", @@ -47,7 +47,7 @@ df = asyncio.run(api.data.historical_ohlc( # Method 3: Using date strings with time df = asyncio.run(api.data.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time="2021-12-20 00:00:00", end_time="2021-12-20 23:59:59", @@ -56,7 +56,7 @@ df = asyncio.run(api.data.historical_ohlc( # Method 4: Using datetime objects df = asyncio.run(api.data.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time=datetime(2021, 12, 20), end_time=datetime(2021, 12, 21), @@ -92,7 +92,7 @@ api = get_api() # Fetch data df = asyncio.run(api.data.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time="2021-12-20", end_time="2021-12-21", @@ -123,7 +123,7 @@ api = get_api() # Fetch data df = asyncio.run(api.data.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time="2021-12-20", end_time="2021-12-21" @@ -191,7 +191,7 @@ api = get_api() # Fetch historical data using date strings (easiest for research) df = asyncio.run(api.data.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, # 1 hour start_time="2021-12-20", end_time="2021-12-21", diff --git a/gateway/src/routes/symbol-routes.ts b/gateway/src/routes/symbol-routes.ts index 57b7419c..d1b49f74 100644 --- a/gateway/src/routes/symbol-routes.ts +++ b/gateway/src/routes/symbol-routes.ts @@ -55,7 +55,7 @@ export class SymbolRoutes { } }); - // Resolve symbol (use wildcard to capture ticker with slashes like BINANCE:BTC/USDT) + // Resolve symbol (use wildcard to capture ticker with slashes like BTC/USDT.BINANCE) app.get('/symbols/*', async (request, reply) => { const symbolIndexService = this.getSymbolIndexService(); diff --git a/gateway/src/services/ohlc-service.ts b/gateway/src/services/ohlc-service.ts index da5e6630..a1205caf 100644 --- a/gateway/src/services/ohlc-service.ts +++ b/gateway/src/services/ohlc-service.ts @@ -25,7 +25,7 @@ import type { TradingViewBar, } from '../types/ohlc.js'; import { - secondsToMicros, + secondsToNanos, backendToTradingView, DEFAULT_SUPPORTED_RESOLUTIONS, } from '../types/ohlc.js'; @@ -79,19 +79,19 @@ export class OHLCService { countback, }, 'Fetching OHLC data'); - // Convert times to microseconds, then align to period boundaries using + // Convert times to nanoseconds, then align to period boundaries using // [ceil(start), ceil(end)) semantics: // - start: ceil to next period boundary — excludes any in-progress candle whose // official timestamp is before from_time. // - end: ceil to next period boundary, used as EXCLUSIVE upper bound — includes // the last candle whose timestamp < to_time, excludes one sitting exactly on // to_time (which would be the next candle, not yet started). - const periodMicros = BigInt(period_seconds) * 1_000_000n; - const raw_start = secondsToMicros(from_time); - const raw_end = secondsToMicros(to_time); + const periodNanos = BigInt(period_seconds) * 1_000_000_000n; + const raw_start = secondsToNanos(from_time); + const raw_end = secondsToNanos(to_time); // bigint ceiling: ceil(a/b)*b = ((a + b - 1) / b) * b - const start_time = ((raw_start + periodMicros - 1n) / periodMicros) * periodMicros; - const end_time = ((raw_end + periodMicros - 1n) / periodMicros) * periodMicros; // exclusive + const start_time = ((raw_start + periodNanos - 1n) / periodNanos) * periodNanos; + const end_time = ((raw_end + periodNanos - 1n) / periodNanos) * periodNanos; // exclusive // Step 1: Check Iceberg for existing data let data = await this.icebergClient.queryOHLC(ticker, period_seconds, start_time, end_time); @@ -220,11 +220,11 @@ export class OHLCService { // For now, return default symbol if query matches if (query.toLowerCase().includes('btc') || query.toLowerCase().includes('binance')) { return [{ - symbol: 'BINANCE:BTC/USDT', - full_name: 'BINANCE:BTC/USDT', + symbol: 'BTC/USDT', + full_name: 'BTC/USDT (BINANCE)', description: 'Bitcoin / Tether USD', exchange: 'BINANCE', - ticker: 'BINANCE:BTC/USDT', + ticker: 'BTC/USDT.BINANCE', type: 'crypto', }]; } @@ -241,12 +241,12 @@ export class OHLCService { this.logger.debug({ symbol }, 'Resolving symbol'); // TODO: Implement central symbol registry - // For now, return default symbol info for BINANCE:BTC/USDT - if (symbol === 'BINANCE:BTC/USDT' || symbol === 'BTC/USDT') { + // For now, return default symbol info for BTC/USDT.BINANCE + if (symbol === 'BTC/USDT.BINANCE' || symbol === 'BTC/USDT') { return { - symbol: 'BINANCE:BTC/USDT', - name: 'BINANCE:BTC/USDT', - ticker: 'BINANCE:BTC/USDT', + symbol: 'BTC/USDT', + name: 'BTC/USDT', + ticker: 'BTC/USDT.BINANCE', description: 'Bitcoin / Tether USD', type: 'crypto', session: '24x7', diff --git a/gateway/src/services/symbol-index-service.ts b/gateway/src/services/symbol-index-service.ts index 6f38bf0e..4ef4abf9 100644 --- a/gateway/src/services/symbol-index-service.ts +++ b/gateway/src/services/symbol-index-service.ts @@ -23,7 +23,7 @@ export interface SymbolIndexServiceConfig { export class SymbolIndexService { private icebergClient: IcebergClient; private logger: FastifyBaseLogger; - private symbols: Map = new Map(); // key: "EXCHANGE:MARKET_ID" + private symbols: Map = new Map(); // key: "MARKET_ID.EXCHANGE" (Nautilus format) private initialized: boolean = false; constructor(config: SymbolIndexServiceConfig) { @@ -52,7 +52,7 @@ export class SymbolIndexService { const uniqueKeys = new Set(); for (const symbol of symbols) { - const key = `${symbol.exchange_id}:${symbol.market_id}`; + const key = `${symbol.market_id}.${symbol.exchange_id}`; uniqueKeys.add(key); this.symbols.set(key, symbol); } @@ -86,7 +86,7 @@ export class SymbolIndexService { * Update or add a symbol to the index */ updateSymbol(symbol: SymbolMetadata): void { - const key = `${symbol.exchange_id}:${symbol.market_id}`; + const key = `${symbol.market_id}.${symbol.exchange_id}`; this.symbols.set(key, symbol); this.logger.debug({ key }, 'Updated symbol in index'); } @@ -149,11 +149,11 @@ export class SymbolIndexService { return null; } - // ticker format: "EXCHANGE:MARKET_ID" or just "MARKET_ID" + // ticker format: "MARKET_ID.EXCHANGE" (Nautilus) or just "MARKET_ID" let key = ticker; - // If no exchange prefix, search for first match - if (!ticker.includes(':')) { + // If no dot separator, search for first match by market_id + if (!ticker.includes('.')) { for (const [k, metadata] of this.symbols) { if (metadata.market_id === ticker) { key = k; @@ -176,7 +176,7 @@ export class SymbolIndexService { */ private metadataToSearchResult(metadata: SymbolMetadata): SearchResult { const symbol = metadata.market_id; // Clean format: "BTC/USDT" - const ticker = `${metadata.exchange_id}:${metadata.market_id}`; // "BINANCE:BTC/USDT" + const ticker = `${metadata.market_id}.${metadata.exchange_id}`; // "BTC/USDT.BINANCE" const fullName = `${metadata.market_id} (${metadata.exchange_id})`; return { @@ -194,15 +194,12 @@ export class SymbolIndexService { */ private metadataToSymbolInfo(metadata: SymbolMetadata): SymbolInfo { const symbol = metadata.market_id; - const ticker = `${metadata.exchange_id}:${metadata.market_id}`; + const ticker = `${metadata.market_id}.${metadata.exchange_id}`; // "BTC/USDT.BINANCE" - // Convert supported_period_seconds to resolution strings const supportedResolutions = this.periodSecondsToResolutions(metadata.supported_period_seconds || []); - // Calculate pricescale from tick_denom - // tick_denom is 10^n where n is the number of decimal places - // pricescale is the same value - const pricescale = metadata.tick_denom ? Number(metadata.tick_denom) : 100; + // pricescale = 10^price_precision (e.g., price_precision=2 → pricescale=100) + const pricescale = metadata.price_precision != null ? Math.pow(10, metadata.price_precision) : 100; return { symbol, @@ -222,9 +219,12 @@ export class SymbolIndexService { base_currency: metadata.base_asset, quote_currency: metadata.quote_asset, data_status: 'streaming', - tick_denominator: metadata.tick_denom ? Number(metadata.tick_denom) : undefined, - base_denominator: metadata.base_denom ? Number(metadata.base_denom) : undefined, - quote_denominator: metadata.quote_denom ? Number(metadata.quote_denom) : undefined, + price_precision: metadata.price_precision, + size_precision: metadata.size_precision, + tick_size: metadata.tick_size, + lot_size: metadata.lot_size, + maker_fee: metadata.maker_fee, + taker_fee: metadata.taker_fee, }; } diff --git a/gateway/src/tools/platform/get-chart-data.tool.ts b/gateway/src/tools/platform/get-chart-data.tool.ts index ae155f63..526ae10c 100644 --- a/gateway/src/tools/platform/get-chart-data.tool.ts +++ b/gateway/src/tools/platform/get-chart-data.tool.ts @@ -164,7 +164,7 @@ async function getChartState(workspaceManager: WorkspaceManager, logger: Fastify if (!chartState) { // Return default chart state return { - symbol: 'BINANCE:BTC/USDT', + symbol: 'BTC/USDT.BINANCE', start_time: null, end_time: null, period: 900, @@ -177,7 +177,7 @@ async function getChartState(workspaceManager: WorkspaceManager, logger: Fastify logger.error({ error }, 'Failed to get chart state from workspace'); // Return default chart state return { - symbol: 'BINANCE:BTC/USDT', + symbol: 'BTC/USDT.BINANCE', start_time: null, end_time: null, period: 900, diff --git a/gateway/src/types/ohlc.ts b/gateway/src/types/ohlc.ts index 269e42a6..acbaccf6 100644 --- a/gateway/src/types/ohlc.ts +++ b/gateway/src/types/ohlc.ts @@ -3,7 +3,7 @@ * * Handles conversion between: * - TradingView datafeed format (seconds, OHLCV structure) - * - Backend/Iceberg format (microseconds, ticker prefix) + * - Backend/Iceberg format (nanoseconds, Nautilus ticker) * - ZMQ protocol format (protobuf messages) */ @@ -31,8 +31,8 @@ export interface TradingViewBar { * Backend OHLC format (from Iceberg) */ export interface BackendOHLC { - timestamp: bigint; // Unix timestamp in MICROSECONDS — kept as bigint to preserve precision - ticker: string; + timestamp: bigint; // Unix timestamp in NANOSECONDS — kept as bigint to preserve precision + ticker: string; // Nautilus format: "BTC/USDT.BINANCE" period_seconds: number; open: number | null; // null for gap bars (no trades that period) high: number | null; @@ -59,7 +59,7 @@ export interface DatafeedConfig { */ export interface SymbolInfo { symbol: string; // Clean format (e.g., "BTC/USDT") - ticker: string; // With exchange prefix (e.g., "BINANCE:BTC/USDT") + ticker: string; // Nautilus format (e.g., "BTC/USDT.BINANCE") name: string; // Display name description: string; // Human-readable description type: string; // "crypto", "spot", "futures", etc. @@ -70,14 +70,18 @@ export interface SymbolInfo { has_intraday: boolean; has_daily: boolean; has_weekly_and_monthly: boolean; - pricescale: number; // Price scale factor + pricescale: number; // 10^price_precision minmov: number; // Minimum price movement base_currency?: string; // Base asset (e.g., "BTC") quote_currency?: string; // Quote asset (e.g., "USDT") data_status?: string; // "streaming", "delayed", etc. - tick_denominator?: number; // Denominator for price scaling (e.g., 1e6) - base_denominator?: number; // Denominator for base asset - quote_denominator?: number; // Denominator for quote asset + // Nautilus Instrument fields + price_precision?: number; + size_precision?: number; + tick_size?: number; + lot_size?: number; + maker_fee?: number; + taker_fee?: number; } /** @@ -95,7 +99,7 @@ export interface HistoryResult { */ export interface SearchResult { symbol: string; // Clean format (e.g., "BTC/USDT") - ticker: string; // With exchange prefix for routing (e.g., "BINANCE:BTC/USDT") + ticker: string; // Nautilus format (e.g., "BTC/USDT.BINANCE") full_name: string; // Full display name (e.g., "BTC/USDT (BINANCE)") description: string; // Human-readable description exchange: string; // Exchange identifier @@ -122,9 +126,9 @@ export enum NotificationStatus { export interface SubmitHistoricalRequest { request_id: string; - ticker: string; - start_time: bigint; // microseconds - end_time: bigint; // microseconds + ticker: string; // Nautilus format: "BTC/USDT.BINANCE" + start_time: bigint; // nanoseconds + end_time: bigint; // nanoseconds period_seconds: number; limit?: number; client_id?: string; @@ -139,34 +143,33 @@ export interface SubmitResponse { export interface HistoryReadyNotification { request_id: string; - ticker: string; + ticker: string; // Nautilus format: "BTC/USDT.BINANCE" period_seconds: number; - start_time: bigint; // microseconds - end_time: bigint; // microseconds + start_time: bigint; // nanoseconds + end_time: bigint; // nanoseconds status: NotificationStatus; error_message?: string; iceberg_namespace: string; iceberg_table: string; row_count: number; - completed_at: bigint; // microseconds + completed_at: bigint; // nanoseconds } /** * Conversion utilities */ -export function secondsToMicros(seconds: number): bigint { - return BigInt(Math.floor(seconds)) * 1000000n; +export function secondsToNanos(seconds: number): bigint { + return BigInt(Math.floor(seconds)) * 1_000_000_000n; } -export function microsToSeconds(micros: bigint | number): number { - // Integer division: convert microseconds to seconds (truncates to integer) - return Number(BigInt(micros) / 1000000n); +export function nanosToSeconds(nanos: bigint | number): number { + return Number(BigInt(nanos) / 1_000_000_000n); } export function backendToTradingView(backend: BackendOHLC): TradingViewBar { return { - time: microsToSeconds(backend.timestamp), + time: nanosToSeconds(backend.timestamp), open: backend.open, high: backend.high, low: backend.low, @@ -220,10 +223,18 @@ export interface SymbolMetadata { description?: string; base_asset?: string; quote_asset?: string; - tick_denom?: bigint; - base_denom?: bigint; - quote_denom?: bigint; supported_period_seconds?: number[]; - earliest_time?: bigint; - updated_at: bigint; + earliest_time?: bigint; // nanoseconds + updated_at: bigint; // nanoseconds + // Nautilus Instrument fields + price_precision?: number; + size_precision?: number; + tick_size?: number; + lot_size?: number; + min_notional?: number; + margin_init?: number; + margin_maint?: number; + maker_fee?: number; + taker_fee?: number; + contract_multiplier?: number; } diff --git a/gateway/src/workspace/types.ts b/gateway/src/workspace/types.ts index a0cf631f..374ffdbb 100644 --- a/gateway/src/workspace/types.ts +++ b/gateway/src/workspace/types.ts @@ -81,7 +81,7 @@ export const DEFAULT_STORES: StoreConfig[] = [ name: 'chartState', persistent: false, initialState: () => ({ - symbol: 'BINANCE:BTC/USDT', + symbol: 'BTC/USDT.BINANCE', start_time: null, end_time: null, period: '15', diff --git a/iceberg/README.md b/iceberg/README.md index 4d70fed0..8ef31095 100644 --- a/iceberg/README.md +++ b/iceberg/README.md @@ -37,14 +37,14 @@ Historical OHLC (Open, High, Low, Close, Volume) candle data for all periods in ```sql -- Query 1-hour candles for specific ticker and time range SELECT * FROM trading.ohlc -WHERE ticker = 'BINANCE:BTC/USDT' +WHERE ticker = 'BTC/USDT.BINANCE' AND period_seconds = 3600 AND timestamp BETWEEN 1735689600000000 AND 1736294399000000 ORDER BY timestamp; -- Query most recent 1-minute candles SELECT * FROM trading.ohlc -WHERE ticker = 'BINANCE:BTC/USDT' +WHERE ticker = 'BTC/USDT.BINANCE' AND period_seconds = 60 AND timestamp > (UNIX_MICROS(CURRENT_TIMESTAMP()) - 3600000000) ORDER BY timestamp DESC @@ -53,7 +53,7 @@ LIMIT 60; -- Query all periods for a ticker SELECT period_seconds, COUNT(*) as candle_count FROM trading.ohlc -WHERE ticker = 'BINANCE:BTC/USDT' +WHERE ticker = 'BTC/USDT.BINANCE' GROUP BY period_seconds; ``` @@ -124,7 +124,7 @@ table = catalog.load_table("trading.ohlc") # Query with filters df = table.scan( row_filter=( - (col("ticker") == "BINANCE:BTC/USDT") & + (col("ticker") == "BTC/USDT.BINANCE") & (col("period_seconds") == 3600) & (col("timestamp") >= 1735689600000000) ) diff --git a/iceberg/ohlc_schema.sql b/iceberg/ohlc_schema.sql index fbebc46c..1449f41d 100644 --- a/iceberg/ohlc_schema.sql +++ b/iceberg/ohlc_schema.sql @@ -5,7 +5,7 @@ CREATE TABLE IF NOT EXISTS trading.ohlc ( -- Natural key fields (uniqueness enforced by Flink upsert logic) - ticker STRING NOT NULL COMMENT 'Market identifier (e.g., BINANCE:BTC/USDT)', + ticker STRING NOT NULL COMMENT 'Market identifier (e.g., BTC/USDT.BINANCE)', period_seconds INT NOT NULL COMMENT 'OHLC period in seconds (60, 300, 900, 3600, 14400, 86400, 604800, etc.)', timestamp BIGINT NOT NULL COMMENT 'Candle timestamp in microseconds since epoch', diff --git a/ingestor/README.md b/ingestor/README.md index b31dca08..06cf7139 100644 --- a/ingestor/README.md +++ b/ingestor/README.md @@ -75,7 +75,7 @@ docker run -v /path/to/config:/config ccxt-ingestor Tickers must be in the format: `EXCHANGE:SYMBOL` Examples: -- `BINANCE:BTC/USDT` +- `BTC/USDT.BINANCE` - `COINBASE:ETH/USD` - `KRAKEN:XRP/EUR` diff --git a/ingestor/src/ccxt-fetcher.js b/ingestor/src/ccxt-fetcher.js index 7e59ea0d..491b9f16 100644 --- a/ingestor/src/ccxt-fetcher.js +++ b/ingestor/src/ccxt-fetcher.js @@ -12,17 +12,17 @@ export class CCXTFetcher { /** * Parse ticker string to exchange and symbol - * Expected format: "EXCHANGE:SYMBOL" (e.g., "BINANCE:BTC/USDT") + * Expected format: "SYMBOL.EXCHANGE" (e.g., "BTC/USDT.BINANCE") */ parseTicker(ticker) { - const parts = ticker.split(':'); - if (parts.length !== 2) { - throw new Error(`Invalid ticker format: ${ticker}. Expected "EXCHANGE:SYMBOL"`); + const lastDot = ticker.lastIndexOf('.'); + if (lastDot === -1) { + throw new Error(`Invalid ticker format: ${ticker}. Expected "SYMBOL.EXCHANGE"`); } return { - exchange: parts[0].toLowerCase(), - symbol: parts[1] + exchange: ticker.slice(lastDot + 1).toLowerCase(), + symbol: ticker.slice(0, lastDot) }; } @@ -101,9 +101,9 @@ export class CCXTFetcher { const { exchange: exchangeName, symbol } = this.parseTicker(ticker); const exchange = this.getExchange(exchangeName); - // Convert microseconds to milliseconds - const startMs = Math.floor(parseInt(startTime) / 1000); - const endMs = Math.floor(parseInt(endTime) / 1000); + // Convert nanoseconds to milliseconds + const startMs = Math.floor(parseInt(startTime) / 1_000_000); + const endMs = Math.floor(parseInt(endTime) / 1_000_000); // Map period seconds to CCXT timeframe const timeframe = this.secondsToTimeframe(periodSeconds); @@ -208,7 +208,7 @@ export class CCXTFetcher { /** * Fetch recent trades for realtime tick data * @param {string} ticker - Ticker in format "EXCHANGE:SYMBOL" - * @param {string} since - Optional timestamp in microseconds to fetch from + * @param {string} since - Optional timestamp in nanoseconds to fetch from * @returns {Promise} Array of trade ticks */ async fetchRecentTrades(ticker, since = null) { @@ -216,8 +216,8 @@ export class CCXTFetcher { const exchange = this.getExchange(exchangeName); try { - // Convert microseconds to milliseconds if provided - const sinceMs = since ? Math.floor(parseInt(since) / 1000) : undefined; + // Convert nanoseconds to milliseconds if provided + const sinceMs = since ? Math.floor(parseInt(since) / 1_000_000) : undefined; const trades = await exchange.fetchTrades(symbol, sinceMs, 1000); @@ -243,25 +243,24 @@ export class CCXTFetcher { /** * Convert CCXT OHLCV array to our OHLC format * CCXT format: [timestamp, open, high, low, close, volume] - * Uses denominators from market metadata for proper integer representation + * Uses precision fields from market metadata for proper integer representation */ convertToOHLC(candle, ticker, periodSeconds, metadata) { const [timestamp, open, high, low, close, volume] = candle; - // Use denominators from metadata - const tickDenom = metadata.tickDenom || 100; - const baseDenom = metadata.baseDenom || 100000000; + const priceMult = Math.pow(10, metadata.pricePrecision ?? 2); + const sizeMult = Math.pow(10, metadata.sizePrecision ?? 8); return { ticker, - timestamp: (timestamp * 1000).toString(), // Convert ms to microseconds - open: Math.round(open * tickDenom).toString(), - high: Math.round(high * tickDenom).toString(), - low: Math.round(low * tickDenom).toString(), - close: Math.round(close * tickDenom).toString(), - volume: Math.round(volume * baseDenom).toString(), - open_time: (timestamp * 1000).toString(), - close_time: ((timestamp + periodSeconds * 1000) * 1000).toString() + timestamp: (timestamp * 1_000_000).toString(), // Convert ms to nanoseconds + open: Math.round(open * priceMult).toString(), + high: Math.round(high * priceMult).toString(), + low: Math.round(low * priceMult).toString(), + close: Math.round(close * priceMult).toString(), + volume: Math.round(volume * sizeMult).toString(), + open_time: (timestamp * 1_000_000).toString(), + close_time: ((timestamp + periodSeconds * 1000) * 1_000_000).toString() }; } @@ -272,35 +271,33 @@ export class CCXTFetcher { createGapBar(timestampMs, ticker, periodSeconds, metadata) { return { ticker, - timestamp: (timestampMs * 1000).toString(), // Convert ms to microseconds + timestamp: (timestampMs * 1_000_000).toString(), // Convert ms to nanoseconds open: null, high: null, low: null, close: null, volume: null, - open_time: (timestampMs * 1000).toString(), - close_time: ((timestampMs + periodSeconds * 1000) * 1000).toString() + open_time: (timestampMs * 1_000_000).toString(), + close_time: ((timestampMs + periodSeconds * 1000) * 1_000_000).toString() }; } /** * Convert CCXT trade to our Tick format - * Uses denominators from market metadata for proper integer representation + * Uses precision fields from market metadata for proper integer representation */ convertToTick(trade, ticker, metadata) { - // Use denominators from metadata - const tickDenom = metadata.tickDenom || 100; - const baseDenom = metadata.baseDenom || 100000000; - const quoteDenom = metadata.quoteDenom || tickDenom; + const priceMult = Math.pow(10, metadata.pricePrecision ?? 2); + const sizeMult = Math.pow(10, metadata.sizePrecision ?? 8); - const price = Math.round(trade.price * tickDenom); - const amount = Math.round(trade.amount * baseDenom); - const quoteAmount = Math.round((trade.price * trade.amount) * quoteDenom); + const price = Math.round(trade.price * priceMult); + const amount = Math.round(trade.amount * sizeMult); + const quoteAmount = Math.round((trade.price * trade.amount) * priceMult); return { trade_id: trade.id || `${trade.timestamp}`, ticker, - timestamp: (trade.timestamp * 1000).toString(), // Convert ms to microseconds + timestamp: (trade.timestamp * 1_000_000).toString(), // Convert ms to nanoseconds price: price.toString(), amount: amount.toString(), quote_amount: quoteAmount.toString(), diff --git a/ingestor/src/symbol-metadata-generator.js b/ingestor/src/symbol-metadata-generator.js index 4e9f6a38..1c2cb9a3 100644 --- a/ingestor/src/symbol-metadata-generator.js +++ b/ingestor/src/symbol-metadata-generator.js @@ -141,59 +141,50 @@ export class SymbolMetadataGenerator { const precision = market.precision || {}; const limits = market.limits || {}; - // Get tick_denom from price precision - // This tells us the denominator for price values. - // For example, if BTC/USDT trades with 2 decimals (0.01 precision), tick_denom = 100 + // Derive Nautilus Instrument fields from CCXT market data // // CCXT precision.price can be: - // - Integer (decimal places): 2 means 0.01 tick size -> denominator 100 - // - Float (tick size): 0.01 -> invert to get denominator 100 - let tick_denom; + // - Integer (decimal places): 2 means 0.01 tick size -> price_precision=2, tick_size=0.01 + // - Float (tick size): 0.01 -> tick_size=0.01, price_precision=2 + let price_precision; + let tick_size; if (precision.price !== undefined) { if (Number.isInteger(precision.price)) { - // Integer: number of decimal places - // e.g., precision.price = 2 means 2 decimal places = 0.01 tick = 100 denom - tick_denom = Math.pow(10, precision.price); + price_precision = precision.price; + tick_size = Math.pow(10, -precision.price); } else { - // Float: actual tick size, need to invert and round - // e.g., precision.price = 0.01 -> 1/0.01 = 100 - tick_denom = Math.round(1 / precision.price); + tick_size = precision.price; + price_precision = Math.round(-Math.log10(precision.price)); } } else if (limits.price?.min !== undefined) { - // Fallback: use minimum price as tick size - tick_denom = Math.round(1 / limits.price.min); + tick_size = limits.price.min; + price_precision = Math.round(-Math.log10(tick_size)); } else { - // Default to 2 decimals (pennies) - tick_denom = 100; + price_precision = 2; + tick_size = 0.01; } - // Get base_denom from amount precision (for volumes) - let base_denom; + let size_precision; + let lot_size; if (precision.amount !== undefined) { if (Number.isInteger(precision.amount)) { - base_denom = Math.pow(10, precision.amount); + size_precision = precision.amount; + lot_size = Math.pow(10, -precision.amount); } else { - base_denom = Math.round(1 / precision.amount); + lot_size = precision.amount; + size_precision = Math.round(-Math.log10(precision.amount)); } } else if (limits.amount?.min !== undefined) { - base_denom = Math.round(1 / limits.amount.min); + lot_size = limits.amount.min; + size_precision = Math.round(-Math.log10(lot_size)); } else { - // Default to 8 decimals (standard for crypto) - base_denom = 100000000; + size_precision = 8; + lot_size = 0.00000001; } - // Get quote_denom from cost precision (price * amount) - let quote_denom; - if (precision.cost !== undefined) { - if (Number.isInteger(precision.cost)) { - quote_denom = Math.pow(10, precision.cost); - } else { - quote_denom = Math.round(1 / precision.cost); - } - } else { - // Default: typically tick_denom for most exchanges - quote_denom = tick_denom; - } + const min_notional = limits.cost?.min || 0; + const maker_fee = market.maker !== undefined ? market.maker : 0.001; + const taker_fee = market.taker !== undefined ? market.taker : 0.001; // Standard supported periods (in seconds) // Most exchanges support these timeframes @@ -218,9 +209,14 @@ export class SymbolMetadataGenerator { description, baseAsset: base, quoteAsset: quote, - tickDenom: tick_denom, - baseDenom: base_denom, - quoteDenom: quote_denom, + pricePrecision: price_precision, + sizePrecision: size_precision, + tickSize: tick_size, + lotSize: lot_size, + minNotional: min_notional, + makerFee: maker_fee, + takerFee: taker_fee, + contractMultiplier: 1.0, supportedPeriodSeconds: supported_period_seconds, // earliestTime can be added later if we track it }; @@ -238,7 +234,7 @@ export class SymbolMetadataGenerator { let duplicateCount = 0; for (const metadata of metadataList) { - const key = `${metadata.exchangeId}:${metadata.marketId}`; + const key = `${metadata.marketId}.${metadata.exchangeId}`; // Debug first few to understand duplication if (uniqueMetadata.length < 3 || (uniqueMetadata.length === 0 && duplicateCount < 3)) { @@ -269,7 +265,7 @@ export class SymbolMetadataGenerator { // Convert each metadata to protobuf Market message const messages = uniqueMetadata.map(metadata => { - const key = `${metadata.exchangeId}:${metadata.marketId}`; + const key = `${metadata.marketId}.${metadata.exchangeId}`; return { key, diff --git a/ingestor/src/zmq-client.js b/ingestor/src/zmq-client.js index e3eaa6da..a07d1615 100644 --- a/ingestor/src/zmq-client.js +++ b/ingestor/src/zmq-client.js @@ -29,9 +29,9 @@ export class ZmqClient { const workEndpoint = `tcp://${flink_hostname}:${ingestor_work_port}`; await this.workSocket.connect(workEndpoint); - // Subscribe to each supported exchange prefix + // Subscribe to each supported exchange suffix (Nautilus format: "BTC/USDT.BINANCE") for (const exchange of this.supportedExchanges) { - const prefix = `${exchange}:`; + const prefix = `${exchange}.`; this.workSocket.subscribe(prefix); this.logger.info(`Subscribed to exchange prefix: ${prefix}`); } diff --git a/protobuf/ingestor.proto b/protobuf/ingestor.proto index 43e82c32..ba2f94cc 100644 --- a/protobuf/ingestor.proto +++ b/protobuf/ingestor.proto @@ -31,10 +31,10 @@ message DataRequest { } message HistoricalParams { - // Start time (microseconds since epoch) + // Start time (nanoseconds since epoch) uint64 start_time = 1; - // End time (microseconds since epoch) + // End time (nanoseconds since epoch) uint64 end_time = 2; // OHLC period in seconds (e.g., 60 = 1m, 300 = 5m, 3600 = 1h, 86400 = 1d) @@ -115,13 +115,13 @@ message SubmitHistoricalRequest { // Client-generated request ID for tracking string request_id = 1; - // Market identifier (e.g., "BINANCE:BTC/USDT") + // Market identifier in Nautilus format (e.g., "BTC/USDT.BINANCE") string ticker = 2; - // Start time (microseconds since epoch) + // Start time (nanoseconds since epoch) uint64 start_time = 3; - // End time (microseconds since epoch) + // End time (nanoseconds since epoch) uint64 end_time = 4; // OHLC period in seconds (e.g., 60 = 1m, 300 = 5m, 3600 = 1h) @@ -170,10 +170,10 @@ message HistoryReadyNotification { // OHLC period in seconds uint32 period_seconds = 3; - // Start time (microseconds since epoch) + // Start time (nanoseconds since epoch) uint64 start_time = 4; - // End time (microseconds since epoch) + // End time (nanoseconds since epoch) uint64 end_time = 5; // Status of the data fetch @@ -189,7 +189,7 @@ message HistoryReadyNotification { // Number of records written uint32 row_count = 12; - // Timestamp when data was written (microseconds since epoch) + // Timestamp when data was written (nanoseconds since epoch) uint64 completed_at = 13; enum NotificationStatus { @@ -208,10 +208,10 @@ message OHLCRequest { // Market identifier string ticker = 2; - // Start time (microseconds since epoch) + // Start time (nanoseconds since epoch) uint64 start_time = 3; - // End time (microseconds since epoch) + // End time (nanoseconds since epoch) uint64 end_time = 4; // OHLC period in seconds (e.g., 60 = 1m, 300 = 5m, 3600 = 1h) @@ -290,7 +290,7 @@ message CEPTriggerEvent { // Trigger ID that fired string trigger_id = 1; - // Timestamp when trigger fired (microseconds since epoch) + // Timestamp when trigger fired (nanoseconds since epoch) uint64 timestamp = 2; // Schema information for the result rows diff --git a/protobuf/market.proto b/protobuf/market.proto index 0d2f6155..92814bef 100644 --- a/protobuf/market.proto +++ b/protobuf/market.proto @@ -4,19 +4,27 @@ option java_multiple_files = true; option java_package = "com.dexorder.proto"; message Market { - // The prices and volumes must be adjusted by the rational denominator provided - // by the market metadata - string exchange_id = 2; // e.g., BINANCE - string market_id = 3; // e.g., BTC/USDT - string market_type = 4; // e.g., Spot - string description = 5; // e.g., Bitcoin/Tether on Binance - repeated string column_names = 6; // e.g., ['open', 'high', 'low', 'close', 'volume', 'taker_vol', 'maker_vol'] - string base_asset = 9; - string quote_asset = 10; - uint64 earliest_time = 11; - uint64 tick_denom = 12; // denominator applied to all OHLC price data - uint64 base_denom = 13; // denominator applied to base asset units - uint64 quote_denom = 14; // denominator applied to quote asset units + string exchange_id = 2; // e.g., BINANCE + string market_id = 3; // e.g., BTC/USDT + string market_type = 4; // e.g., Spot, CryptoPerpetual + string description = 5; // e.g., Bitcoin/Tether on Binance + repeated string column_names = 6; // e.g., ['open', 'high', 'low', 'close', 'volume'] + string base_asset = 9; // e.g., BTC + string quote_asset = 10; // e.g., USDT + uint64 earliest_time = 11; // nanoseconds since epoch + repeated uint32 supported_period_seconds = 15; + // Nautilus Instrument fields — used to construct Instrument objects in the sandbox bridge + uint32 price_precision = 16; // decimal places for prices (e.g., 2 for $0.01 resolution) + uint32 size_precision = 17; // decimal places for quantities + double tick_size = 18; // minimum price increment (e.g., 0.01) + double lot_size = 19; // minimum order size + double min_notional = 20; // minimum order value in quote currency + double margin_init = 21; // initial margin requirement (futures/perps only) + double margin_maint = 22; // maintenance margin (futures/perps only) + double maker_fee = 23; // maker fee rate (e.g., 0.001 = 0.1%) + double taker_fee = 24; // taker fee rate + double contract_multiplier = 25; // contract multiplier for derivatives (default 1.0) + } diff --git a/protobuf/ohlc.proto b/protobuf/ohlc.proto index 46062976..8900d1c7 100644 --- a/protobuf/ohlc.proto +++ b/protobuf/ohlc.proto @@ -5,11 +5,11 @@ option java_package = "com.dexorder.proto"; // Single OHLC row message OHLC { - // Timestamp in microseconds since epoch + // Timestamp in nanoseconds since epoch uint64 timestamp = 1; - // The prices and volumes must be adjusted by the rational denominator provided - // by the market metadata. Optional to support null bars for periods with no trades. + // Prices are stored as doubles (Nautilus-aligned, no denominator needed). + // Optional to support null bars for periods with no trades. optional int64 open = 2; optional int64 high = 3; optional int64 low = 4; @@ -22,7 +22,7 @@ message OHLC { optional int64 low_time = 11; optional int64 close_time = 12; optional int64 open_interest = 13; - string ticker = 14; + string ticker = 14; // Nautilus format: "BTC/USDT.BINANCE" } // Batch of OHLC rows with metadata for historical request tracking @@ -49,7 +49,7 @@ message OHLCBatchMetadata { // OHLC period in seconds uint32 period_seconds = 4; - // Time range requested (microseconds since epoch) + // Time range requested (nanoseconds since epoch) uint64 start_time = 5; uint64 end_time = 6; diff --git a/protobuf/tick.proto b/protobuf/tick.proto index 5efb40bd..84caeab4 100644 --- a/protobuf/tick.proto +++ b/protobuf/tick.proto @@ -7,19 +7,19 @@ message Tick { // Unique identifier for the trade string trade_id = 1; - // Market identifier (matches Market.market_id) + // Market identifier in Nautilus format: "BTC/USDT.BINANCE" string ticker = 2; - // Timestamp in microseconds since epoch + // Timestamp in nanoseconds since epoch uint64 timestamp = 3; - // Price (must be adjusted by tick_denom from Market metadata) + // Price as a scaled integer (divide by 10^price_precision from Market metadata) int64 price = 4; - // Base asset amount (must be adjusted by base_denom from Market metadata) + // Base asset amount as a scaled integer (divide by 10^size_precision from Market metadata) int64 amount = 5; - // Quote asset amount (must be adjusted by quote_denom from Market metadata) + // Quote asset amount as a scaled integer int64 quote_amount = 6; // Side: true = taker buy (market buy), false = taker sell (market sell) diff --git a/relay/README.md b/relay/README.md index a78246bc..02148e71 100644 --- a/relay/README.md +++ b/relay/README.md @@ -77,7 +77,7 @@ The relay acts as a well-known bind point for all components: ``` 1. Client subscribes to ticker Socket: SUB → XPUB (5558) - Topic: "BINANCE:BTC/USDT|tick" + Topic: "BTC/USDT.BINANCE|tick" 2. Relay forwards subscription Socket: XSUB → Flink PUB (5557) diff --git a/relay/src/relay.rs b/relay/src/relay.rs index c9437b68..8d807529 100644 --- a/relay/src/relay.rs +++ b/relay/src/relay.rs @@ -205,13 +205,13 @@ impl Relay { info!("Handling request submission: request_id={}, ticker={}, client_id={:?}", request_id, ticker, client_id); - // Extract exchange prefix from ticker - let exchange_prefix = ticker.split(':').next() - .map(|s| format!("{}:", s)) + // Extract exchange suffix from ticker (Nautilus format: "BTC/USDT.BINANCE") + let exchange_prefix = ticker.rsplitn(2, '.').next() + .map(|s| format!("{}.", s)) .unwrap_or_else(|| String::from("")); if exchange_prefix.is_empty() { - warn!("Ticker '{}' missing exchange prefix", ticker); + warn!("Ticker '{}' missing exchange suffix", ticker); } // Build DataRequest protobuf for ingestors diff --git a/sandbox/README.md b/sandbox/README.md index b0f3cded..78dfd336 100644 --- a/sandbox/README.md +++ b/sandbox/README.md @@ -36,7 +36,7 @@ async def main(): try: # Fetch OHLC data (automatically checks cache and requests missing data) df = await client.fetch_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, # 1-hour candles start_time=1735689600000000, # microseconds end_time=1736294399000000 @@ -112,7 +112,7 @@ Initialize the client with connection parameters. Fetch OHLC data with smart caching. **Parameters:** -- `ticker` (str): Market identifier (e.g., "BINANCE:BTC/USDT") +- `ticker` (str): Market identifier (e.g., "BTC/USDT.BINANCE") - `period_seconds` (int): OHLC period in seconds (60, 300, 3600, 86400, etc.) - `start_time` (int): Start timestamp in microseconds - `end_time` (int): End timestamp in microseconds @@ -179,7 +179,7 @@ await client.connect() # Now safe to make requests result = await client.request_historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time=1735689600000000, end_time=1736294399000000 diff --git a/sandbox/RESEARCH_API_USAGE.md b/sandbox/RESEARCH_API_USAGE.md index 1bf9d063..351f388f 100644 --- a/sandbox/RESEARCH_API_USAGE.md +++ b/sandbox/RESEARCH_API_USAGE.md @@ -29,7 +29,7 @@ api = get_api() # Method 1: Using Unix timestamps (seconds) df = asyncio.run(api.data.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, # 1 hour candles start_time=1640000000, # Unix timestamp in seconds end_time=1640086400, @@ -38,7 +38,7 @@ df = asyncio.run(api.data.historical_ohlc( # Method 2: Using date strings df = asyncio.run(api.data.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time="2021-12-20", # Simple date string end_time="2021-12-21", @@ -47,7 +47,7 @@ df = asyncio.run(api.data.historical_ohlc( # Method 3: Using date strings with time df = asyncio.run(api.data.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time="2021-12-20 00:00:00", end_time="2021-12-20 23:59:59", @@ -56,7 +56,7 @@ df = asyncio.run(api.data.historical_ohlc( # Method 4: Using datetime objects df = asyncio.run(api.data.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time=datetime(2021, 12, 20), end_time=datetime(2021, 12, 21), @@ -92,7 +92,7 @@ api = get_api() # Fetch data df = asyncio.run(api.data.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time="2021-12-20", end_time="2021-12-21", @@ -121,7 +121,7 @@ api = get_api() # Fetch data df = asyncio.run(api.data.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time="2021-12-20", end_time="2021-12-21" @@ -161,7 +161,7 @@ api = get_api() # Fetch historical data using date strings (easiest for research) df = asyncio.run(api.data.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, # 1 hour start_time="2021-12-20", end_time="2021-12-21", diff --git a/sandbox/dexorder/api/__init__.py b/sandbox/dexorder/api/__init__.py index 59a678b8..04a3954a 100644 --- a/sandbox/dexorder/api/__init__.py +++ b/sandbox/dexorder/api/__init__.py @@ -44,7 +44,7 @@ def get_api() -> API: # Fetch data df = asyncio.run(api.data.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time="2021-12-20", end_time="2021-12-21" diff --git a/sandbox/dexorder/api/api.py b/sandbox/dexorder/api/api.py index 96eea631..32b0b719 100644 --- a/sandbox/dexorder/api/api.py +++ b/sandbox/dexorder/api/api.py @@ -29,7 +29,7 @@ class API: # Fetch data df = asyncio.run(api.data.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time="2021-12-20", end_time="2021-12-21" diff --git a/sandbox/dexorder/api/data_api.py b/sandbox/dexorder/api/data_api.py index 94e73256..f6940dcf 100644 --- a/sandbox/dexorder/api/data_api.py +++ b/sandbox/dexorder/api/data_api.py @@ -27,8 +27,8 @@ class DataAPI(ABC): Fetch historical OHLC candlestick data for a market. Args: - ticker: Market identifier in format "EXCHANGE:SYMBOL" - Examples: "BINANCE:BTC/USDT", "COINBASE:ETH/USD" + ticker: Market identifier in format "MARKET.EXCHANGE" + Examples: "BTC/USDT.BINANCE", "ETH/USD.COINBASE" period_seconds: Candle period in seconds Common values: - 60 (1 minute) @@ -55,7 +55,7 @@ class DataAPI(ABC): Returns: DataFrame with candlestick data sorted by timestamp (ascending). Standard columns (always included): - - timestamp: Period start time in microseconds + - timestamp: Period start time in nanoseconds - open: Opening price (decimal float) - high: Highest price (decimal float) - low: Lowest price (decimal float) @@ -71,7 +71,7 @@ class DataAPI(ABC): Examples: # Basic OHLC with Unix timestamp df = await api.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time=1640000000, end_time=1640086400 @@ -79,7 +79,7 @@ class DataAPI(ABC): # Using date strings with volume df = await api.historical_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600, start_time="2021-12-20", end_time="2021-12-21", @@ -89,7 +89,7 @@ class DataAPI(ABC): # Using datetime objects from datetime import datetime df = await api.historical_ohlc( - ticker="COINBASE:ETH/USD", + ticker="ETH/USD.COINBASE", period_seconds=300, start_time=datetime(2021, 12, 20, 9, 30), end_time=datetime(2021, 12, 20, 16, 30), @@ -113,8 +113,8 @@ class DataAPI(ABC): specify exact timestamps. Useful for real-time analysis and indicators. Args: - ticker: Market identifier in format "EXCHANGE:SYMBOL" - Examples: "BINANCE:BTC/USDT", "COINBASE:ETH/USD" + ticker: Market identifier in format "MARKET.EXCHANGE" + Examples: "BTC/USDT.BINANCE", "ETH/USD.COINBASE" period_seconds: OHLC candle period in seconds Common values: 60 (1m), 300 (5m), 900 (15m), 3600 (1h), 86400 (1d), 604800 (1w) @@ -133,14 +133,14 @@ class DataAPI(ABC): Examples: # Get the last candle df = await api.latest_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=3600 ) # Returns: timestamp, open, high, low, close # Get the last 50 5-minute candles with volume df = await api.latest_ohlc( - ticker="COINBASE:ETH/USD", + ticker="ETH/USD.COINBASE", period_seconds=300, length=50, extra_columns=["volume", "buy_vol", "sell_vol"] @@ -148,7 +148,7 @@ class DataAPI(ABC): # Get recent candles with all timing data df = await api.latest_ohlc( - ticker="BINANCE:BTC/USDT", + ticker="BTC/USDT.BINANCE", period_seconds=60, length=100, extra_columns=["open_time", "high_time", "low_time", "close_time"] diff --git a/sandbox/dexorder/history_client.py b/sandbox/dexorder/history_client.py index a7e4ba35..24aeb8a4 100644 --- a/sandbox/dexorder/history_client.py +++ b/sandbox/dexorder/history_client.py @@ -110,10 +110,10 @@ class HistoryClient: IMPORTANT: Call connect() before using this method. Args: - ticker: Market identifier (e.g., "BINANCE:BTC/USDT") + ticker: Market identifier (e.g., "BTC/USDT.BINANCE") period_seconds: OHLC period in seconds - start_time: Start timestamp in microseconds - end_time: End timestamp in microseconds + start_time: Start timestamp in nanoseconds + end_time: End timestamp in nanoseconds timeout: Request timeout in seconds (default: 30) limit: Optional limit on number of candles diff --git a/sandbox/dexorder/iceberg_client.py b/sandbox/dexorder/iceberg_client.py index 966e3299..ba9875a4 100644 --- a/sandbox/dexorder/iceberg_client.py +++ b/sandbox/dexorder/iceberg_client.py @@ -1,5 +1,8 @@ """ IcebergClient - Query OHLC data from Iceberg warehouse (Iceberg 1.10.1) + +Tickers use Nautilus format: "BTC/USDT.BINANCE" +All timestamps are nanoseconds since epoch. """ from typing import Optional, List, Tuple @@ -39,7 +42,6 @@ class IcebergClient: s3_endpoint: Optional[str] = None, s3_access_key: Optional[str] = None, s3_secret_key: Optional[str] = None, - metadata_client=None, # SymbolMetadataClient (avoid circular import) ): """ Initialize Iceberg client. @@ -50,11 +52,9 @@ class IcebergClient: s3_endpoint: S3/MinIO endpoint URL (e.g., "http://localhost:9000") s3_access_key: S3/MinIO access key s3_secret_key: S3/MinIO secret key - metadata_client: SymbolMetadataClient for price/volume conversion """ self.catalog_uri = catalog_uri self.namespace = namespace - self.metadata_client = metadata_client catalog_props = {"uri": catalog_uri} if s3_endpoint: @@ -80,15 +80,14 @@ class IcebergClient: Query OHLC data for a specific ticker, period, and time range. Args: - ticker: Market identifier (e.g., "BINANCE:BTC/USDT") + ticker: Market identifier in Nautilus format (e.g., "BTC/USDT.BINANCE") period_seconds: OHLC period in seconds (60, 300, 3600, etc.) - start_time: Start timestamp in microseconds - end_time: End timestamp in microseconds - columns: Optional list of columns to select. If None, returns all columns. - Example: ["timestamp", "open", "high", "low", "close", "volume"] + start_time: Start timestamp in nanoseconds + end_time: End timestamp in nanoseconds (exclusive) + columns: Optional list of columns to select. Returns: - DataFrame with OHLC data sorted by timestamp + DataFrame with OHLC data sorted by timestamp, with a DatetimeIndex (UTC). """ # Reload table metadata to pick up snapshots committed after this client was initialized self.table = self.catalog.load_table(f"{self.namespace}.ohlc") @@ -102,7 +101,6 @@ class IcebergClient: ) ) - # Select specific columns if requested if columns is not None: scan = scan.select(*columns) @@ -110,52 +108,10 @@ class IcebergClient: if not df.empty: df = df.sort_values("timestamp") - # Convert integer microsecond timestamps to DatetimeIndex - df.index = pd.to_datetime(df["timestamp"], unit="us", utc=True) + # Convert integer nanosecond timestamps to DatetimeIndex + df.index = pd.to_datetime(df["timestamp"], unit="ns", utc=True) df.index.name = "datetime" df = df.drop(columns=["timestamp"]) - # Apply price/volume conversion if metadata client available - if self.metadata_client is not None: - df = self._apply_denominators(df, ticker) - - return df - - def _apply_denominators(self, df: pd.DataFrame, ticker: str) -> pd.DataFrame: - """ - Convert integer prices and volumes to decimal floats using market metadata. - - Args: - df: DataFrame with integer OHLC data - ticker: Market identifier for metadata lookup - - Returns: - DataFrame with decimal prices and volumes - - Raises: - ValueError: If metadata not found for ticker - """ - if df.empty: - return df - - # Get metadata for this ticker - metadata = self.metadata_client.get_metadata(ticker) - - # Convert price columns (divide by tick_denom) - price_columns = ["open", "high", "low", "close"] - for col in price_columns: - if col in df.columns: - df[col] = df[col].astype(float) / metadata.tick_denom - - # Convert volume columns (divide by base_denom) - volume_columns = ["volume", "buy_vol", "sell_vol"] - for col in volume_columns: - if col in df.columns and df[col].notna().any(): - df[col] = df[col].astype(float) / metadata.base_denom - - log.debug( - f"Applied denominators to {ticker}: tick_denom={metadata.tick_denom}, " - f"base_denom={metadata.base_denom} ({len(df)} rows)" - ) return df @@ -169,32 +125,28 @@ class IcebergClient: """ Identify missing data ranges in the requested time period. - Returns list of (start, end) tuples for missing ranges. - Expected candles are calculated based on period_seconds. - Args: - ticker: Market identifier + ticker: Market identifier in Nautilus format period_seconds: OHLC period in seconds - start_time: Start timestamp in microseconds - end_time: End timestamp in microseconds + start_time: Start timestamp in nanoseconds + end_time: End timestamp in nanoseconds Returns: - List of (start_time, end_time) tuples for missing ranges + List of (start_time, end_time) tuples for missing ranges (nanoseconds) """ df = self.query_ohlc(ticker, period_seconds, start_time, end_time) if df.empty: - # All data is missing return [(start_time, end_time)] - # Convert period to microseconds - period_micros = period_seconds * 1_000_000 + # Convert period to nanoseconds + period_nanos = period_seconds * 1_000_000_000 # Generate expected timestamps — end_time is exclusive - expected_timestamps = list(range(start_time, end_time, period_micros)) - actual_timestamps = set(df.index.view('int64') // 1000) + expected_timestamps = list(range(start_time, end_time, period_nanos)) + # DatetimeIndex backed by nanoseconds — view as int64 directly + actual_timestamps = set(df.index.view('int64')) - # Find gaps missing = sorted(set(expected_timestamps) - actual_timestamps) if not missing: @@ -206,15 +158,12 @@ class IcebergClient: prev_ts = missing[0] for ts in missing[1:]: - if ts > prev_ts + period_micros: - # Gap in missing data - close previous range + if ts > prev_ts + period_nanos: ranges.append((range_start, prev_ts)) range_start = ts prev_ts = ts - # Close final range ranges.append((range_start, prev_ts)) - return ranges def has_data( @@ -228,10 +177,10 @@ class IcebergClient: Check if any data exists for the given parameters. Args: - ticker: Market identifier + ticker: Market identifier in Nautilus format period_seconds: OHLC period in seconds - start_time: Start timestamp in microseconds - end_time: End timestamp in microseconds + start_time: Start timestamp in nanoseconds + end_time: End timestamp in nanoseconds Returns: True if at least one candle exists, False otherwise diff --git a/sandbox/dexorder/impl/charting_api_impl.py b/sandbox/dexorder/impl/charting_api_impl.py index 82c2e63b..cd6952d0 100644 --- a/sandbox/dexorder/impl/charting_api_impl.py +++ b/sandbox/dexorder/impl/charting_api_impl.py @@ -138,8 +138,8 @@ class ChartingAPIImpl(ChartingAPI): if col in df.columns: # Handle potential timestamp index (convert from microseconds) if df.index.name == 'timestamp' or 'timestamp' in str(df.index.dtype): - # Assume microseconds, convert to datetime - plot_index = pd.to_datetime(df.index, unit='us') + # Assume nanoseconds, convert to datetime + plot_index = pd.to_datetime(df.index, unit='ns') else: plot_index = df.index @@ -206,18 +206,18 @@ class ChartingAPIImpl(ChartingAPI): """ df_copy = df.copy() - # Handle timestamp column (in microseconds) -> DatetimeIndex + # Handle timestamp column (in nanoseconds) -> DatetimeIndex if 'timestamp' in df_copy.columns: - df_copy.index = pd.to_datetime(df_copy['timestamp'], unit='us') + df_copy.index = pd.to_datetime(df_copy['timestamp'], unit='ns') df_copy = df_copy.drop(columns=['timestamp']) elif df_copy.index.name == 'timestamp' or 'int' in str(df_copy.index.dtype): - # Index is timestamp in microseconds - df_copy.index = pd.to_datetime(df_copy.index, unit='us') + # Index is timestamp in nanoseconds + df_copy.index = pd.to_datetime(df_copy.index, unit='ns') # Ensure index is DatetimeIndex if not isinstance(df_copy.index, pd.DatetimeIndex): raise ValueError( - "DataFrame must have a DatetimeIndex or a 'timestamp' column in microseconds" + "DataFrame must have a DatetimeIndex or a 'timestamp' column in nanoseconds" ) # Normalize column names to lowercase diff --git a/sandbox/dexorder/impl/data_api_impl.py b/sandbox/dexorder/impl/data_api_impl.py index 77a8663b..313e33af 100644 --- a/sandbox/dexorder/impl/data_api_impl.py +++ b/sandbox/dexorder/impl/data_api_impl.py @@ -8,7 +8,7 @@ import pandas as pd from dexorder.api.data_api import DataAPI from dexorder.ohlc_client import OHLCClient -from dexorder.utils import TimestampInput, to_microseconds +from dexorder.utils import TimestampInput, to_nanoseconds log = logging.getLogger(__name__) @@ -105,12 +105,12 @@ class DataAPIImpl(DataAPI): if not self._started: await self.start() - # Convert timestamps to microseconds - start_micros = to_microseconds(start_time) - end_micros = to_microseconds(end_time) + # Convert timestamps to nanoseconds + start_nanos = to_nanoseconds(start_time) + end_nanos = to_nanoseconds(end_time) log.debug(f"Fetching OHLC: {ticker}, period={period_seconds}s, " - f"start={start_time} ({start_micros}), end={end_time} ({end_micros})") + f"start={start_time} ({start_nanos}ns), end={end_time} ({end_nanos}ns)") # Validate extra_columns if extra_columns: @@ -131,8 +131,8 @@ class DataAPIImpl(DataAPI): df = await self.ohlc_client.fetch_ohlc( ticker=ticker, period_seconds=period_seconds, - start_time=start_micros, - end_time=end_micros, + start_time=start_nanos, + end_time=end_nanos, request_timeout=self.request_timeout ) diff --git a/sandbox/dexorder/ohlc_client.py b/sandbox/dexorder/ohlc_client.py index 2dfadcd0..6f7050c3 100644 --- a/sandbox/dexorder/ohlc_client.py +++ b/sandbox/dexorder/ohlc_client.py @@ -8,7 +8,6 @@ import logging from typing import Optional from .iceberg_client import IcebergClient from .history_client import HistoryClient -from .symbol_metadata_client import SymbolMetadataClient log = logging.getLogger(__name__) @@ -53,26 +52,14 @@ class OHLCClient: s3_access_key: S3/MinIO access key s3_secret_key: S3/MinIO secret key """ - # Initialize symbol metadata client for price/volume conversion - self.metadata = SymbolMetadataClient( - iceberg_catalog_uri, - namespace=namespace, - s3_endpoint=s3_endpoint, - s3_access_key=s3_access_key, - s3_secret_key=s3_secret_key, - ) - - # Initialize Iceberg client with metadata client for automatic conversion self.iceberg = IcebergClient( iceberg_catalog_uri, namespace, s3_endpoint=s3_endpoint, s3_access_key=s3_access_key, s3_secret_key=s3_secret_key, - metadata_client=self.metadata, ) - self.history = HistoryClient(relay_endpoint, notification_endpoint) - log.info("OHLCClient initialized with automatic price/volume conversion") + log.info("OHLCClient initialized") async def start(self): """ @@ -107,10 +94,10 @@ class OHLCClient: 6. Return results Args: - ticker: Market identifier (e.g., "BINANCE:BTC/USDT") + ticker: Market identifier in Nautilus format (e.g., "BTC/USDT.BINANCE") period_seconds: OHLC period in seconds (60, 300, 3600, etc.) - start_time: Start timestamp in microseconds - end_time: End timestamp in microseconds + start_time: Start timestamp in nanoseconds + end_time: End timestamp in nanoseconds request_timeout: Timeout for historical data requests (default: 30s) Returns: @@ -121,9 +108,9 @@ class OHLCClient: ValueError: If request fails """ # Align times to period boundaries: [ceil(start), ceil(end)) exclusive - period_micros = period_seconds * 1_000_000 - start_time = ((start_time + period_micros - 1) // period_micros) * period_micros - end_time = ((end_time + period_micros - 1) // period_micros) * period_micros # exclusive + period_nanos = period_seconds * 1_000_000_000 + start_time = ((start_time + period_nanos - 1) // period_nanos) * period_nanos + end_time = ((end_time + period_nanos - 1) // period_nanos) * period_nanos # exclusive # Step 1: Check Iceberg for existing data df = self.iceberg.query_ohlc(ticker, period_seconds, start_time, end_time) diff --git a/sandbox/dexorder/symbol_metadata_client.py b/sandbox/dexorder/symbol_metadata_client.py index 1dd51c4b..9ea6d052 100644 --- a/sandbox/dexorder/symbol_metadata_client.py +++ b/sandbox/dexorder/symbol_metadata_client.py @@ -1,8 +1,7 @@ """ -SymbolMetadataClient - Query symbol metadata from Iceberg for price/volume conversion. +SymbolMetadataClient - Query symbol metadata from Iceberg. -Provides lazy-loaded, cached access to symbol metadata including denominators -used to convert integer OHLC data to decimal prices and volumes. +Tickers use Nautilus format: "BTC/USDT.BINANCE" (market_id.exchange_id). """ import logging @@ -13,23 +12,67 @@ from pyiceberg.expressions import EqualTo, And log = logging.getLogger(__name__) +def format_ticker(exchange_id: str, market_id: str) -> str: + """Format a ticker in Nautilus convention: 'BTC/USDT.BINANCE'.""" + return f"{market_id}.{exchange_id}" + + +def parse_ticker(ticker: str) -> tuple[str, str]: + """ + Parse a Nautilus-format ticker into (exchange_id, market_id). + + Args: + ticker: e.g. "BTC/USDT.BINANCE" + + Returns: + (exchange_id, market_id) e.g. ("BINANCE", "BTC/USDT") + + Raises: + ValueError: if the ticker does not contain a dot separator + """ + if "." not in ticker: + raise ValueError( + f"Invalid ticker format '{ticker}'. Expected Nautilus format: 'MARKET.EXCHANGE' " + f"(e.g., 'BTC/USDT.BINANCE')" + ) + # Split on the LAST dot to handle market IDs that could theoretically contain dots + dot_pos = ticker.rfind(".") + market_id = ticker[:dot_pos] + exchange_id = ticker[dot_pos + 1:] + return exchange_id, market_id + + class SymbolMetadata(NamedTuple): - """Symbol metadata containing denominators for price/volume conversion.""" + """Symbol metadata for Nautilus Instrument construction and order validation.""" exchange_id: str market_id: str - tick_denom: int # Denominator for price fields (open, high, low, close) - base_denom: int # Denominator for base asset (volume in base terms) - quote_denom: int # Denominator for quote asset market_type: Optional[str] = None description: Optional[str] = None + base_asset: Optional[str] = None + quote_asset: Optional[str] = None + # Nautilus Instrument fields + price_precision: Optional[int] = None # decimal places for prices + size_precision: Optional[int] = None # decimal places for quantities + tick_size: Optional[float] = None # minimum price increment + lot_size: Optional[float] = None # minimum order size + min_notional: Optional[float] = None # minimum order value in quote currency + margin_init: Optional[float] = None # initial margin (futures/perps only) + margin_maint: Optional[float] = None # maintenance margin (futures/perps only) + maker_fee: Optional[float] = None # maker fee rate (e.g., 0.001 = 0.1%) + taker_fee: Optional[float] = None # taker fee rate + contract_multiplier: Optional[float] = None # for derivatives (default 1.0) + + @property + def ticker(self) -> str: + """Nautilus-format ticker: 'BTC/USDT.BINANCE'.""" + return format_ticker(self.exchange_id, self.market_id) class SymbolMetadataClient: """ Client for querying symbol metadata from Iceberg. - Provides lazy-loaded, cached access to market metadata including - denominators needed to convert integer OHLC prices/volumes to decimals. + Tickers use Nautilus format: "BTC/USDT.BINANCE" """ def __init__( @@ -40,16 +83,6 @@ class SymbolMetadataClient: s3_access_key: Optional[str] = None, s3_secret_key: Optional[str] = None, ): - """ - Initialize symbol metadata client. - - Args: - catalog_uri: URI of the Iceberg catalog - namespace: Iceberg namespace (default: "trading") - s3_endpoint: S3/MinIO endpoint URL - s3_access_key: S3/MinIO access key - s3_secret_key: S3/MinIO secret key - """ self.catalog_uri = catalog_uri self.namespace = namespace @@ -63,55 +96,39 @@ class SymbolMetadataClient: catalog_props["s3.secret-access-key"] = s3_secret_key self.catalog = load_catalog("trading", **catalog_props) - - # Lazy load the table self._table = None - - # Cache: ticker -> SymbolMetadata self._cache: Dict[str, SymbolMetadata] = {} @property def table(self): - """Lazy load the symbol_metadata table.""" if self._table is None: try: self._table = self.catalog.load_table(f"{self.namespace}.symbol_metadata") log.info(f"Loaded symbol_metadata table from {self.namespace}") except Exception as e: raise RuntimeError( - f"Failed to load symbol_metadata table from {self.namespace}.symbol_metadata. " - f"This table is required for price/volume conversion. Error: {e}" + f"Failed to load symbol_metadata table from {self.namespace}.symbol_metadata: {e}" ) from e return self._table def get_metadata(self, ticker: str) -> SymbolMetadata: """ - Get metadata for a ticker (e.g., "BINANCE:BTC/USDT"). + Get metadata for a ticker (e.g., "BTC/USDT.BINANCE"). Args: - ticker: Market identifier in format "EXCHANGE:SYMBOL" + ticker: Market identifier in Nautilus format "MARKET.EXCHANGE" Returns: - SymbolMetadata with denominators and market info + SymbolMetadata with Nautilus instrument fields Raises: ValueError: If ticker format is invalid or metadata not found - RuntimeError: If symbol_metadata table cannot be loaded """ - # Check cache first if ticker in self._cache: return self._cache[ticker] - # Parse ticker into exchange_id and market_id - if ":" not in ticker: - raise ValueError( - f"Invalid ticker format '{ticker}'. Expected format: 'EXCHANGE:SYMBOL' " - f"(e.g., 'BINANCE:BTC/USDT')" - ) + exchange_id, market_id = parse_ticker(ticker) - exchange_id, market_id = ticker.split(":", 1) - - # Query Iceberg for this symbol try: df = self.table.scan( row_filter=And( @@ -122,9 +139,9 @@ class SymbolMetadataClient: if df.empty: raise ValueError( - f"No metadata found for ticker '{ticker}' (exchange_id='{exchange_id}', " - f"market_id='{market_id}'). The symbol may not be configured in the system. " - f"Available tickers can be queried from the symbol_metadata table." + f"No metadata found for ticker '{ticker}' " + f"(exchange_id='{exchange_id}', market_id='{market_id}'). " + f"The symbol may not be configured in the system." ) if len(df) > 1: @@ -132,55 +149,44 @@ class SymbolMetadataClient: row = df.iloc[0] - # Extract denominators (required fields) - tick_denom = row.get("tick_denom") - base_denom = row.get("base_denom") - quote_denom = row.get("quote_denom") + def _opt_int(col): + v = row.get(col) + return int(v) if v is not None and not (isinstance(v, float) and v != v) else None - if tick_denom is None or tick_denom == 0: - raise ValueError( - f"Invalid tick_denom for {ticker}: {tick_denom}. " - f"Denominator must be a positive integer." - ) - - if base_denom is None or base_denom == 0: - raise ValueError( - f"Invalid base_denom for {ticker}: {base_denom}. " - f"Denominator must be a positive integer." - ) - - if quote_denom is None or quote_denom == 0: - raise ValueError( - f"Invalid quote_denom for {ticker}: {quote_denom}. " - f"Denominator must be a positive integer." - ) + def _opt_float(col): + v = row.get(col) + return float(v) if v is not None and not (isinstance(v, float) and v != v) else None metadata = SymbolMetadata( exchange_id=exchange_id, market_id=market_id, - tick_denom=int(tick_denom), - base_denom=int(base_denom), - quote_denom=int(quote_denom), market_type=row.get("market_type"), description=row.get("description"), + base_asset=row.get("base_asset"), + quote_asset=row.get("quote_asset"), + price_precision=_opt_int("price_precision"), + size_precision=_opt_int("size_precision"), + tick_size=_opt_float("tick_size"), + lot_size=_opt_float("lot_size"), + min_notional=_opt_float("min_notional"), + margin_init=_opt_float("margin_init"), + margin_maint=_opt_float("margin_maint"), + maker_fee=_opt_float("maker_fee"), + taker_fee=_opt_float("taker_fee"), + contract_multiplier=_opt_float("contract_multiplier"), ) - # Cache the result self._cache[ticker] = metadata log.debug( - f"Loaded metadata for {ticker}: tick_denom={metadata.tick_denom}, " - f"base_denom={metadata.base_denom}, quote_denom={metadata.quote_denom}" + f"Loaded metadata for {ticker}: price_precision={metadata.price_precision}, " + f"tick_size={metadata.tick_size}, maker_fee={metadata.maker_fee}" ) - return metadata except ValueError: - # Re-raise ValueError as-is (ticker not found, invalid format, etc.) raise except Exception as e: - raise RuntimeError( - f"Failed to query metadata for ticker '{ticker}': {e}" - ) from e + raise RuntimeError(f"Failed to query metadata for ticker '{ticker}': {e}") from e def clear_cache(self): """Clear the metadata cache (useful for testing or forcing reloads).""" diff --git a/sandbox/dexorder/utils.py b/sandbox/dexorder/utils.py index b5341957..f971a9a5 100644 --- a/sandbox/dexorder/utils.py +++ b/sandbox/dexorder/utils.py @@ -2,6 +2,7 @@ Utility functions for dexorder. Includes timestamp conversions, date parsing, and other common utilities. +All internal timestamps use nanoseconds since epoch (UTC). """ import logging @@ -15,13 +16,15 @@ log = logging.getLogger(__name__) # Type alias for flexible timestamp input TimestampInput = Union[int, float, str, datetime, pd.Timestamp] +NANOS_PER_SECOND = 1_000_000_000 -def to_microseconds(timestamp: TimestampInput) -> int: + +def to_nanoseconds(timestamp: TimestampInput) -> int: """ - Convert various timestamp formats to microseconds since epoch. + Convert various timestamp formats to nanoseconds since epoch. This is the canonical way to convert user-friendly timestamps (unix seconds, - date strings, datetime objects) into the internal microsecond format used + date strings, datetime objects) into the internal nanosecond format used throughout the dexorder system. Args: @@ -32,87 +35,69 @@ def to_microseconds(timestamp: TimestampInput) -> int: - pandas Timestamp Returns: - Microseconds since epoch as integer + Nanoseconds since epoch as integer Examples: - >>> to_microseconds(1640000000) # Unix timestamp in seconds - 1640000000000000 - >>> to_microseconds(1640000000.5) # Unix timestamp with fractional seconds - 1640000000500000 - >>> to_microseconds("2021-12-20") # Date string - 1640000000000000 - >>> to_microseconds("2021-12-20 12:00:00") # Date string with time - 1640000000000000 - >>> to_microseconds(datetime(2021, 12, 20, 12, 0, 0)) # datetime object - 1640000000000000 - >>> to_microseconds(pd.Timestamp("2021-12-20 12:00:00")) # pandas Timestamp - 1640000000000000 + >>> to_nanoseconds(1640000000) # Unix timestamp in seconds + 1640000000000000000 + >>> to_nanoseconds(1640000000.5) # Unix timestamp with fractional seconds + 1640000000500000000 + >>> to_nanoseconds("2021-12-20") + 1639958400000000000 """ if isinstance(timestamp, (int, float)): - # Assume Unix timestamp in seconds - return int(timestamp * 1_000_000) + return int(timestamp * NANOS_PER_SECOND) elif isinstance(timestamp, str): - # Parse date string dt = dateparser.parse(timestamp) if dt is None: raise ValueError(f"Could not parse date string: {timestamp}") - return int(dt.timestamp() * 1_000_000) + return int(dt.timestamp() * NANOS_PER_SECOND) elif isinstance(timestamp, datetime): - return int(timestamp.timestamp() * 1_000_000) + return int(timestamp.timestamp() * NANOS_PER_SECOND) elif isinstance(timestamp, pd.Timestamp): - return int(timestamp.timestamp() * 1_000_000) + return int(timestamp.timestamp() * NANOS_PER_SECOND) else: raise TypeError(f"Unsupported timestamp type: {type(timestamp)}") -def to_seconds(timestamp_micros: int) -> float: +def to_seconds(timestamp_nanos: int) -> float: """ - Convert microseconds since epoch to Unix timestamp in seconds. + Convert nanoseconds since epoch to Unix timestamp in seconds. Args: - timestamp_micros: Timestamp in microseconds since epoch + timestamp_nanos: Timestamp in nanoseconds since epoch Returns: Unix timestamp in seconds (float) Examples: - >>> to_seconds(1640000000000000) + >>> to_seconds(1640000000000000000) 1640000000.0 - >>> to_seconds(1640000000500000) - 1640000000.5 """ - return timestamp_micros / 1_000_000 + return timestamp_nanos / NANOS_PER_SECOND -def to_datetime(timestamp_micros: int) -> datetime: +def to_datetime(timestamp_nanos: int) -> datetime: """ - Convert microseconds since epoch to datetime object. + Convert nanoseconds since epoch to datetime object (UTC). Args: - timestamp_micros: Timestamp in microseconds since epoch + timestamp_nanos: Timestamp in nanoseconds since epoch Returns: datetime object in UTC - - Examples: - >>> to_datetime(1640000000000000) - datetime.datetime(2021, 12, 20, 12, 0, tzinfo=datetime.timezone.utc) """ - return datetime.fromtimestamp(timestamp_micros / 1_000_000) + return datetime.fromtimestamp(timestamp_nanos / NANOS_PER_SECOND) -def to_timestamp(timestamp_micros: int) -> pd.Timestamp: +def to_timestamp(timestamp_nanos: int) -> pd.Timestamp: """ - Convert microseconds since epoch to pandas Timestamp. + Convert nanoseconds since epoch to pandas Timestamp. Args: - timestamp_micros: Timestamp in microseconds since epoch + timestamp_nanos: Timestamp in nanoseconds since epoch Returns: pandas Timestamp - - Examples: - >>> to_timestamp(1640000000000000) - Timestamp('2021-12-20 12:00:00') """ - return pd.Timestamp(timestamp_micros, unit='us') + return pd.Timestamp(timestamp_nanos, unit='ns') diff --git a/test/README.md b/test/README.md index 90029b58..806c95f9 100644 --- a/test/README.md +++ b/test/README.md @@ -29,7 +29,7 @@ This will: ### Expected Flow 1. **Client** sends OHLCRequest to Flink (REQ/REP) - - Ticker: `BINANCE:BTC/USDT` + - Ticker: `BTC/USDT.BINANCE` - Period: 3600s (1 hour) - Range: Jan 1-7, 2026 diff --git a/test/history_client/README.md b/test/history_client/README.md index 6fc44153..6e8f1675 100644 --- a/test/history_client/README.md +++ b/test/history_client/README.md @@ -23,7 +23,7 @@ python client.py ## What it does 1. Connects to Flink's client request endpoint (REQ/REP on port 5559) -2. Requests 1-hour OHLC candles for BINANCE:BTC/USDT +2. Requests 1-hour OHLC candles for BTC/USDT.BINANCE 3. Time range: January 1-7, 2026 (168 candles) 4. Waits for Flink to respond (up to 30 seconds) 5. Displays the response status and sample data diff --git a/test/history_client/client.py b/test/history_client/client.py index 3b3c2bf1..d5585eab 100644 --- a/test/history_client/client.py +++ b/test/history_client/client.py @@ -32,7 +32,7 @@ class HistoryClient: Request historical OHLC data via Relay. Args: - ticker: Market identifier (e.g., "BINANCE:BTC/USDT") + ticker: Market identifier (e.g., "BTC/USDT.BINANCE") start_time: Start timestamp in microseconds since epoch end_time: End timestamp in microseconds since epoch period_seconds: OHLC period in seconds (e.g., 3600 for 1h) @@ -161,7 +161,7 @@ def main(): # Connect to Relay client.connect() - # Request BINANCE:BTC/USDT 1h candles for first 7 days of January 2026 + # Request BTC/USDT.BINANCE 1h candles for first 7 days of January 2026 # January 1, 2026 00:00:00 UTC = 1735689600 seconds = 1735689600000000 microseconds # January 7, 2026 23:59:59 UTC = 1736294399 seconds = 1736294399000000 microseconds @@ -169,7 +169,7 @@ def main(): end_time_us = 1736294399 * 1_000_000 # Jan 7, 2026 23:59:59 UTC response = client.request_historical_ohlc( - ticker='BINANCE:BTC/USDT', + ticker='BTC/USDT.BINANCE', start_time=start_time_us, end_time=end_time_us, period_seconds=3600, # 1 hour diff --git a/test/history_client/client_async.py b/test/history_client/client_async.py index 1dfd60db..9d865024 100644 --- a/test/history_client/client_async.py +++ b/test/history_client/client_async.py @@ -56,7 +56,7 @@ class AsyncHistoryClient: 3. Query Iceberg with the table information (or notification includes data) Args: - ticker: Market identifier (e.g., "BINANCE:BTC/USDT") + ticker: Market identifier (e.g., "BTC/USDT.BINANCE") start_time: Start timestamp in microseconds since epoch end_time: End timestamp in microseconds since epoch period_seconds: OHLC period in seconds (e.g., 3600 for 1h) @@ -263,12 +263,12 @@ def main(): # Connect client.connect() - # Request BINANCE:BTC/USDT 1h candles for first 7 days of January 2026 + # Request BTC/USDT.BINANCE 1h candles for first 7 days of January 2026 start_time_us = 1735689600 * 1_000_000 # Jan 1, 2026 00:00:00 UTC end_time_us = 1736294399 * 1_000_000 # Jan 7, 2026 23:59:59 UTC notification = client.request_historical_ohlc( - ticker='BINANCE:BTC/USDT', + ticker='BTC/USDT.BINANCE', start_time=start_time_us, end_time=end_time_us, period_seconds=3600, # 1 hour diff --git a/test/history_client/client_ohlc_api.py b/test/history_client/client_ohlc_api.py index f60b8f5d..d930f9ed 100755 --- a/test/history_client/client_ohlc_api.py +++ b/test/history_client/client_ohlc_api.py @@ -39,7 +39,7 @@ async def main(): print("✅ Client started\n") # Request parameters - ticker = "BINANCE:BTC/USDT" + ticker = "BTC/USDT.BINANCE" period_seconds = 3600 # 1-hour candles # Request 7 days of data (Jan 1-7, 2026) diff --git a/web/src/stores/chart.ts b/web/src/stores/chart.ts index 6f427ae0..836ba73f 100644 --- a/web/src/stores/chart.ts +++ b/web/src/stores/chart.ts @@ -10,7 +10,7 @@ export interface ChartState { } export const useChartStore = defineStore('chartState', () => { - const symbol = ref('BINANCE:BTC/USDT') + const symbol = ref('BTC/USDT.BINANCE') const start_time = ref(null) const end_time = ref(null) const period = ref(900) // seconds; default 15 minutes