From 0178b5d29d60f3be95b6f2454f76622752fee47a Mon Sep 17 00:00:00 2001 From: Tim Olson Date: Sun, 26 Apr 2026 18:39:52 -0400 Subject: [PATCH] Add Ticker24h support: hourly market snapshots with USD-normalized volume filtering --- doc/plan.md | 3 +- .../com/dexorder/flink/TradingFlinkApp.java | 36 +++ .../com/dexorder/flink/config/AppConfig.java | 23 ++ .../flink/ingestor/IngestorBroker.java | 77 ++++-- .../ingestor/RealtimeSubscriptionManager.java | 14 +- .../dexorder/flink/publisher/RealtimeBar.java | 24 +- .../flink/publisher/RealtimeBarFunction.java | 31 ++- .../flink/quotes/Ticker24hFunction.java | 172 +++++++++++++ .../flink/quotes/Ticker24hPublisher.java | 78 ++++++ .../flink/quotes/Ticker24hScheduler.java | 71 +++++ .../flink/quotes/Ticker24hWrapper.java | 24 ++ .../flink/quotes/TickerBatchDeserializer.java | 95 +++++++ .../flink/quotes/TickerBatchWrapper.java | 86 +++++++ flink/src/main/resources/topics.yaml | 10 + gateway/knowledge/api-reference.md | 57 +++++ gateway/knowledge/usage-examples.md | 55 ++++ gateway/prompt/agent-main.md | 2 +- gateway/prompt/agent-research.md | 45 ++++ gateway/prompt/tools.md | 1 + gateway/src/channels/websocket-handler.ts | 55 ++-- gateway/src/clients/zmq-protocol.ts | 98 ++++++- gateway/src/clients/zmq-relay-client.ts | 49 +++- gateway/src/harness/spawn/spawn-service.ts | 2 +- gateway/src/harness/tool-labels.ts | 1 + gateway/src/main.ts | 49 +++- gateway/src/services/ohlc-service.ts | 76 +----- gateway/src/services/symbol-index-service.ts | 36 ++- .../src/tools/platform/get-ticker24h.tool.ts | 90 +++++++ gateway/src/tools/tool-registry.ts | 15 ++ ingestor/src/ccxt-fetcher.js | 74 ++++++ ingestor/src/index.js | 48 ++++ ingestor/src/kafka-producer.js | 35 ++- protobuf/ingestor.proto | 1 + protobuf/ticker24h.proto | 61 +++++ sandbox/RESEARCH_API_USAGE.md | 48 ++++ sandbox/dexorder/api/data_api.py | 58 ++++- sandbox/dexorder/impl/data_api_impl.py | 44 ++++ sandbox/dexorder/ticker24h_client.py | 242 ++++++++++++++++++ sandbox/dexorder/tools/evaluate_indicator.py | 12 +- sandbox/dexorder/tools/python_tools.py | 33 ++- sandbox/main.py | 10 +- web/src/components/CategoryItemList.vue | 49 +++- web/src/components/DetailsEditDialog.vue | 7 +- web/src/composables/useCustomIndicators.ts | 46 +++- web/src/composables/useTradingViewDatafeed.ts | 22 +- 45 files changed, 1995 insertions(+), 170 deletions(-) create mode 100644 flink/src/main/java/com/dexorder/flink/quotes/Ticker24hFunction.java create mode 100644 flink/src/main/java/com/dexorder/flink/quotes/Ticker24hPublisher.java create mode 100644 flink/src/main/java/com/dexorder/flink/quotes/Ticker24hScheduler.java create mode 100644 flink/src/main/java/com/dexorder/flink/quotes/Ticker24hWrapper.java create mode 100644 flink/src/main/java/com/dexorder/flink/quotes/TickerBatchDeserializer.java create mode 100644 flink/src/main/java/com/dexorder/flink/quotes/TickerBatchWrapper.java create mode 100644 gateway/src/tools/platform/get-ticker24h.tool.ts create mode 100644 protobuf/ticker24h.proto create mode 100644 sandbox/dexorder/ticker24h_client.py diff --git a/doc/plan.md b/doc/plan.md index af36c8b3..3d253f89 100644 --- a/doc/plan.md +++ b/doc/plan.md @@ -1,7 +1,8 @@ # Development Plan -* Realtime data +* Daily volume in symbol metadata * Triggers +* Screeners * Strategy UI * Backtesting TV integration * Paper Trading diff --git a/flink/src/main/java/com/dexorder/flink/TradingFlinkApp.java b/flink/src/main/java/com/dexorder/flink/TradingFlinkApp.java index 226f7395..f876de3e 100644 --- a/flink/src/main/java/com/dexorder/flink/TradingFlinkApp.java +++ b/flink/src/main/java/com/dexorder/flink/TradingFlinkApp.java @@ -16,6 +16,12 @@ import com.dexorder.flink.publisher.RealtimeBarFunction; import com.dexorder.flink.publisher.RealtimeBarPublisher; import com.dexorder.flink.publisher.TickWrapper; import com.dexorder.flink.publisher.TickDeserializer; +import com.dexorder.flink.quotes.Ticker24hFunction; +import com.dexorder.flink.quotes.Ticker24hPublisher; +import com.dexorder.flink.quotes.Ticker24hScheduler; +import com.dexorder.flink.quotes.Ticker24hWrapper; +import com.dexorder.flink.quotes.TickerBatchDeserializer; +import com.dexorder.flink.quotes.TickerBatchWrapper; import com.dexorder.flink.sink.HistoricalBatchWriter; import com.dexorder.flink.sink.SymbolMetadataWriter; import com.dexorder.flink.zmq.ZmqChannelManager; @@ -273,6 +279,35 @@ public class TradingFlinkApp { LOG.info("Realtime tick pipeline configured: market-tick → OHLC bars → clients (periods={})", java.util.Arrays.toString(periods)); + // Ticker24h pipeline: market-ticker Kafka → QuoteCurrencyIndex → ZMQ XPUB + KafkaSource tickerSource = KafkaSource.builder() + .setBootstrapServers(config.getKafkaBootstrapServers()) + .setTopics(config.getKafkaTickerTopic()) + .setGroupId("flink-ticker24h-consumer") + .setStartingOffsets(OffsetsInitializer.latest()) + .setValueOnlyDeserializer(new TickerBatchDeserializer()) + .build(); + + DataStream tickerBatchStream = env + .fromSource(tickerSource, WatermarkStrategy.noWatermarks(), "TickerBatch Kafka Source"); + + DataStream ticker24hStream = tickerBatchStream + .flatMap(new Ticker24hFunction()) + .setParallelism(1) + .name("Ticker24hFunction"); + + ticker24hStream.addSink(new Ticker24hPublisher(notificationEndpoint)) + .setParallelism(1) + .name("Ticker24hPublisher"); + + LOG.info("Ticker24h pipeline configured: market-ticker → Ticker24hFunction → clients"); + + // Start Ticker24h scheduler (fires on startup + hourly for all configured exchanges) + Ticker24hScheduler ticker24hScheduler = new Ticker24hScheduler( + broker, config.getSupportedExchanges()); + ticker24hScheduler.start(); + LOG.info("Ticker24hScheduler started for exchanges: {}", config.getSupportedExchanges()); + // TODO: Set up CEP patterns and triggers LOG.info("Flink job configured, starting execution"); @@ -281,6 +316,7 @@ public class TradingFlinkApp { Runtime.getRuntime().addShutdownHook(new Thread(() -> { LOG.info("Shutting down Trading Flink Application"); try { + ticker24hScheduler.stop(); notificationForwarder.close(); subscriptionManager.stop(); broker.stop(); diff --git a/flink/src/main/java/com/dexorder/flink/config/AppConfig.java b/flink/src/main/java/com/dexorder/flink/config/AppConfig.java index 83c0a741..6c567de6 100644 --- a/flink/src/main/java/com/dexorder/flink/config/AppConfig.java +++ b/flink/src/main/java/com/dexorder/flink/config/AppConfig.java @@ -4,7 +4,10 @@ import org.yaml.snakeyaml.Yaml; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -136,6 +139,26 @@ public class AppConfig { return getString("kafka_ohlc_topic", "market-ohlc"); } + public String getKafkaTickerTopic() { + return getString("kafka_ticker_topic", "market-ticker"); + } + + /** + * Comma-separated list of exchange IDs to fetch Ticker24h snapshots for. + * Default: BINANCE only. + */ + public List getSupportedExchanges() { + String raw = getString("supported_exchanges", "BINANCE"); + List result = new ArrayList<>(); + for (String part : raw.split(",")) { + String trimmed = part.trim().toUpperCase(); + if (!trimmed.isEmpty()) { + result.add(trimmed); + } + } + return result; + } + // Notification config: // Task managers PUSH notifications to this endpoint (job manager PULL address) public String getNotificationPublishEndpoint() { diff --git a/flink/src/main/java/com/dexorder/flink/ingestor/IngestorBroker.java b/flink/src/main/java/com/dexorder/flink/ingestor/IngestorBroker.java index 0e24e50c..07a2d684 100644 --- a/flink/src/main/java/com/dexorder/flink/ingestor/IngestorBroker.java +++ b/flink/src/main/java/com/dexorder/flink/ingestor/IngestorBroker.java @@ -61,6 +61,8 @@ public class IngestorBroker implements AutoCloseable { private static final long HEARTBEAT_TIMEOUT_MS = 25_000; /** Re-queue historical job if not completed within this window (ms) */ private static final long HISTORICAL_TIMEOUT_MS = 120_000; + /** Re-queue ticker snapshot job if not completed within this window (ms) */ + private static final long TICKER_SNAPSHOT_TIMEOUT_MS = 30_000; private final ZmqChannelManager zmqManager; private volatile boolean running; @@ -113,6 +115,23 @@ public class IngestorBroker implements AutoCloseable { LOG.info("IngestorBroker stopped"); } + /** + * Submit a TICKER_SNAPSHOT request from outside the broker thread (thread-safe). + * Called by Ticker24hScheduler on startup and hourly. + * Uses sentinel ticker "@TICKER24H.{EXCHANGE}" (e.g., "@TICKER24H.BINANCE"). + */ + public void submitTicker24hRequest(String exchange) { + String jobId = UUID.randomUUID().toString(); + DataRequest request = DataRequest.newBuilder() + .setRequestId(jobId) + .setJobId(jobId) + .setType(DataRequest.RequestType.TICKER_SNAPSHOT) + .setTicker("@TICKER24H." + exchange.toUpperCase()) + .build(); + externalSubmissions.add(request); + LOG.info("Enqueued TICKER_SNAPSHOT request: exchange={}, jobId={}", exchange, jobId); + } + /** * Submit a realtime data request from outside the broker thread (thread-safe). * Called by RealtimeSubscriptionManager when subscription ref count goes 0→1. @@ -219,21 +238,38 @@ public class IngestorBroker implements AutoCloseable { try { SubmitHistoricalRequest req = SubmitHistoricalRequest.parseFrom(payload); String jobId = UUID.randomUUID().toString(); - DataRequest dataRequest = DataRequest.newBuilder() - .setRequestId(req.getRequestId()) - .setJobId(jobId) - .setType(DataRequest.RequestType.HISTORICAL_OHLC) - .setTicker(req.getTicker()) - .setHistorical(com.dexorder.proto.HistoricalParams.newBuilder() - .setStartTime(req.getStartTime()) - .setEndTime(req.getEndTime()) - .setPeriodSeconds(req.getPeriodSeconds()) - .build()) - .setClientId(req.hasClientId() ? req.getClientId() : "") - .build(); + String ticker = req.getTicker(); + String clientId = req.hasClientId() ? req.getClientId() : ""; + + DataRequest dataRequest; + if (ticker.startsWith("@TICKER24H.")) { + // Client-initiated ticker snapshot — route to TICKER_SNAPSHOT, not OHLC + dataRequest = DataRequest.newBuilder() + .setRequestId(req.getRequestId()) + .setJobId(jobId) + .setType(DataRequest.RequestType.TICKER_SNAPSHOT) + .setTicker(ticker) + .setClientId(clientId) + .build(); + LOG.info("Routing client-initiated TICKER_SNAPSHOT: request_id={}, ticker={}, client_id={}", + req.getRequestId(), ticker, clientId); + } else { + dataRequest = DataRequest.newBuilder() + .setRequestId(req.getRequestId()) + .setJobId(jobId) + .setType(DataRequest.RequestType.HISTORICAL_OHLC) + .setTicker(ticker) + .setHistorical(com.dexorder.proto.HistoricalParams.newBuilder() + .setStartTime(req.getStartTime()) + .setEndTime(req.getEndTime()) + .setPeriodSeconds(req.getPeriodSeconds()) + .build()) + .setClientId(clientId) + .build(); + LOG.info("Received historical request from relay: request_id={}, ticker={}", + req.getRequestId(), ticker); + } enqueueJob(dataRequest); - LOG.info("Received historical request from relay: request_id={}, ticker={}", - req.getRequestId(), req.getTicker()); } catch (Exception e) { LOG.error("Failed to parse SubmitHistoricalRequest from relay", e); } @@ -411,8 +447,14 @@ public class IngestorBroker implements AutoCloseable { for (Map.Entry entry : activeJobs.entrySet()) { ActiveJob job = entry.getValue(); - long timeout = job.type == DataRequest.RequestType.REALTIME_TICKS - ? HEARTBEAT_TIMEOUT_MS : HISTORICAL_TIMEOUT_MS; + long timeout; + if (job.type == DataRequest.RequestType.REALTIME_TICKS) { + timeout = HEARTBEAT_TIMEOUT_MS; + } else if (job.type == DataRequest.RequestType.TICKER_SNAPSHOT) { + timeout = TICKER_SNAPSHOT_TIMEOUT_MS; + } else { + timeout = HISTORICAL_TIMEOUT_MS; + } if (now - job.lastHeartbeat > timeout) { timedOut.add(entry.getKey()); } @@ -460,7 +502,8 @@ public class IngestorBroker implements AutoCloseable { boolean exchangeMatch = exchange.isEmpty() || slot.exchange.equals(exchange); boolean typeMatch = slot.slotType == SlotType.ANY || (slot.slotType == SlotType.HISTORICAL - && requestType == DataRequest.RequestType.HISTORICAL_OHLC) + && (requestType == DataRequest.RequestType.HISTORICAL_OHLC + || requestType == DataRequest.RequestType.TICKER_SNAPSHOT)) || (slot.slotType == SlotType.REALTIME && requestType == DataRequest.RequestType.REALTIME_TICKS); if (exchangeMatch && typeMatch) { diff --git a/flink/src/main/java/com/dexorder/flink/ingestor/RealtimeSubscriptionManager.java b/flink/src/main/java/com/dexorder/flink/ingestor/RealtimeSubscriptionManager.java index 7060770f..4c17b8b0 100644 --- a/flink/src/main/java/com/dexorder/flink/ingestor/RealtimeSubscriptionManager.java +++ b/flink/src/main/java/com/dexorder/flink/ingestor/RealtimeSubscriptionManager.java @@ -19,17 +19,21 @@ import java.util.regex.Pattern; * must go through {@link #enqueuePublish(byte[]...)} so they are sent from the single loop * thread — ZMQ sockets are not thread-safe. * - * Topic format: {@code {ticker}|ohlc:{period_seconds}} - * Example: {@code BTC/USDT.BINANCE|ohlc:60} + * Topic formats: + * Closed bars: {@code {ticker}|ohlc:{period_seconds}} (strategies, existing consumers) + * Open bars: {@code {ticker}|ohlc:{period_seconds}:open} (chart, live price updates) + * + * Both topic forms map to the same underlying ingestor activation for that ticker. * * Reference counting: - * tickerRefs — across all periods for a ticker; 0→1 triggers ingestor activation - * topicRefs — per (ticker, period); consulted by RealtimeOHLCPublisher to filter output + * tickerRefs — across all subscribed topics for a ticker; 0→1 triggers ingestor activation + * topicRefs — per topic string; consulted by RealtimeOHLCPublisher to filter output */ public class RealtimeSubscriptionManager implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(RealtimeSubscriptionManager.class); - private static final Pattern TOPIC_PATTERN = Pattern.compile("^(.+)\\|ohlc:(\\d+)$"); + // Matches both "{ticker}|ohlc:{period}" and "{ticker}|ohlc:{period}:open" + private static final Pattern TOPIC_PATTERN = Pattern.compile("^(.+)\\|ohlc:(\\d+)(:open)?$"); private final ZmqChannelManager zmqManager; private final ZMQ.Socket xpubSocket; diff --git a/flink/src/main/java/com/dexorder/flink/publisher/RealtimeBar.java b/flink/src/main/java/com/dexorder/flink/publisher/RealtimeBar.java index f599b608..e7f6ba8f 100644 --- a/flink/src/main/java/com/dexorder/flink/publisher/RealtimeBar.java +++ b/flink/src/main/java/com/dexorder/flink/publisher/RealtimeBar.java @@ -3,8 +3,11 @@ package com.dexorder.flink.publisher; import java.io.Serializable; /** - * A single completed OHLC bar for a given ticker and period. + * A single OHLC bar for a given ticker and period. * Output type of RealtimeBarFunction, input type of RealtimeBarPublisher. + * + * isClosed=true → window fully closed; published on topic "{ticker}|ohlc:{period}" + * isClosed=false → window still open (snapshot); published on "{ticker}|ohlc:{period}:open" */ public class RealtimeBar implements Serializable { private static final long serialVersionUID = 1L; @@ -23,11 +26,14 @@ public class RealtimeBar implements Serializable { private long volume; /** Number of ticks in this window */ private int tickCount; + /** True if this bar's time window has fully closed; false if still accumulating. */ + private boolean isClosed; public RealtimeBar() {} public RealtimeBar(String ticker, int periodSeconds, long windowStartMs, - long open, long high, long low, long close, long volume, int tickCount) { + long open, long high, long low, long close, long volume, int tickCount, + boolean isClosed) { this.ticker = ticker; this.periodSeconds = periodSeconds; this.windowStartMs = windowStartMs; @@ -37,6 +43,7 @@ public class RealtimeBar implements Serializable { this.close = close; this.volume = volume; this.tickCount = tickCount; + this.isClosed = isClosed; } public String getTicker() { return ticker; } @@ -48,6 +55,7 @@ public class RealtimeBar implements Serializable { public long getClose() { return close; } public long getVolume() { return volume; } public int getTickCount() { return tickCount; } + public boolean isClosed() { return isClosed; } public void setTicker(String ticker) { this.ticker = ticker; } public void setPeriodSeconds(int periodSeconds) { this.periodSeconds = periodSeconds; } @@ -58,16 +66,22 @@ public class RealtimeBar implements Serializable { public void setClose(long close) { this.close = close; } public void setVolume(long volume) { this.volume = volume; } public void setTickCount(int tickCount) { this.tickCount = tickCount; } + public void setClosed(boolean closed) { this.isClosed = closed; } - /** ZMQ topic for this bar: e.g., "BTC/USDT.BINANCE|ohlc:60" */ + /** + * ZMQ topic for this bar. + * Closed bars: "{ticker}|ohlc:{period}" (strategies, existing consumers) + * Open bars: "{ticker}|ohlc:{period}:open" (chart, live price updates) + */ public String topic() { - return ticker + "|ohlc:" + periodSeconds; + return ticker + "|ohlc:" + periodSeconds + (isClosed ? "" : ":open"); } @Override public String toString() { return "RealtimeBar{ticker='" + ticker + "', period=" + periodSeconds + "s, windowStart=" + windowStartMs + ", O=" + open + " H=" + high + - " L=" + low + " C=" + close + ", ticks=" + tickCount + '}'; + " L=" + low + " C=" + close + ", ticks=" + tickCount + + ", closed=" + isClosed + '}'; } } diff --git a/flink/src/main/java/com/dexorder/flink/publisher/RealtimeBarFunction.java b/flink/src/main/java/com/dexorder/flink/publisher/RealtimeBarFunction.java index 17453f29..c5221a07 100644 --- a/flink/src/main/java/com/dexorder/flink/publisher/RealtimeBarFunction.java +++ b/flink/src/main/java/com/dexorder/flink/publisher/RealtimeBarFunction.java @@ -19,8 +19,11 @@ import org.slf4j.LoggerFactory; * emitted immediately when the boundary is crossed, so bars are delayed by at most * one tick interval (~10s for realtime polling). * - * Periods are configurable at construction time. All configured periods are computed - * for every ticker receiving ticks; the ZMQ publisher filters to active subscriptions. + * Emits two types of bars per tick: + * - Open bar (isClosed=false): the current accumulator state, every tick. + * Topic: "{ticker}|ohlc:{period}:open" — consumed by charts for live price display. + * - Closed bar (isClosed=true): emitted once when a window boundary is crossed. + * Topic: "{ticker}|ohlc:{period}" — consumed by strategies/triggers. * * Accumulator layout (long[7]): * [0] open @@ -68,26 +71,31 @@ public class RealtimeBarFunction extends RichFlatMapFunction 0) { - out.collect(toBar(tick.getTicker(), period, accum)); - LOG.debug("Emitted bar: ticker={}, period={}s, windowStart={}, ticks={}", + out.collect(toBar(tick.getTicker(), period, accum, true)); + LOG.debug("Emitted closed bar: ticker={}, period={}s, windowStart={}, ticks={}", tick.getTicker(), period, accum[5], accum[6]); } - accumState.put(period, openWindow(tick, windowStart)); + long[] newAccum = openWindow(tick, windowStart); + accumState.put(period, newAccum); + out.collect(toBar(tick.getTicker(), period, newAccum, false)); } else { - // Same window — update + // Same window — update accumulator and emit current open bar accum[1] = Math.max(accum[1], tick.getPrice()); // high accum[2] = Math.min(accum[2], tick.getPrice()); // low accum[3] = tick.getPrice(); // close accum[4] += tick.getAmount(); // volume accum[6]++; // tick count accumState.put(period, accum); + out.collect(toBar(tick.getTicker(), period, accum, false)); } } } @@ -104,13 +112,14 @@ public class RealtimeBarFunction extends RichFlatMapFunction { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(Ticker24hFunction.class); + + private static final Set USD_STABLECOINS = new HashSet<>(Arrays.asList( + "USDT", "USDC", "BUSD", "TUSD", "DAI", "USDP", "GUSD" + )); + + // Exchanges checked in priority order when looking up cross-currency rates + private static final List EXCHANGE_PRIORITY = Arrays.asList( + "BINANCE", "COINBASE", "KRAKEN" + ); + + // exchange → (ticker → lastPrice), maintained across all received batches + private transient Map> exchangePriceIndex; + + @Override + public void open(Configuration parameters) { + exchangePriceIndex = new HashMap<>(); + } + + @Override + public void flatMap(TickerBatchWrapper batch, Collector out) { + String exchangeId = batch.getExchangeId(); + long fetchedAt = batch.getFetchedAt(); + List rows = batch.getTickers(); + + // Update cross-exchange price index with this batch's prices + Map priceMap = new HashMap<>(rows.size() * 2); + for (TickerBatchWrapper.TickerStatsRow row : rows) { + if (row.lastPrice > 0) { + priceMap.put(row.ticker, row.lastPrice); + } + } + exchangePriceIndex.put(exchangeId, priceMap); + + // Build QuoteCurrencyIndex from all unique quote assets in this batch + Set quoteAssets = new LinkedHashSet<>(); + for (TickerBatchWrapper.TickerStatsRow row : rows) { + quoteAssets.add(row.quoteAsset); + } + + Map usdRates = new HashMap<>(); + Map usdSources = new HashMap<>(); + QuoteCurrencyIndex.Builder indexBuilder = QuoteCurrencyIndex.newBuilder() + .setGeneratedAt(fetchedAt); + + for (String quoteAsset : quoteAssets) { + QuoteCurrencyRate rate = buildRate(quoteAsset, fetchedAt); + if (rate != null) { + indexBuilder.addRates(rate); + usdRates.put(quoteAsset, rate.getUsdRate()); + usdSources.put(quoteAsset, rate.getSourceTicker()); + } + } + + QuoteCurrencyIndex currencyIndex = indexBuilder.build(); + + // Build Ticker24h with std_quote_volume for each ticker + Ticker24h.Builder ticker24hBuilder = Ticker24h.newBuilder() + .setExchangeId(exchangeId) + .setGeneratedAt(fetchedAt) + .setCurrencyIndex(currencyIndex); + + for (TickerBatchWrapper.TickerStatsRow row : rows) { + TickerStats.Builder tsBuilder = TickerStats.newBuilder() + .setTicker(row.ticker) + .setExchangeId(row.exchangeId) + .setBaseAsset(row.baseAsset) + .setQuoteAsset(row.quoteAsset) + .setLastPrice(row.lastPrice) + .setPriceChangePct(row.priceChangePct) + .setQuoteVolume24H(row.quoteVolume24h) + .setTimestamp(row.timestamp); + + if (row.bidPrice != null) tsBuilder.setBidPrice(row.bidPrice); + if (row.askPrice != null) tsBuilder.setAskPrice(row.askPrice); + if (row.open24h != null) tsBuilder.setOpen24H(row.open24h); + if (row.high24h != null) tsBuilder.setHigh24H(row.high24h); + if (row.low24h != null) tsBuilder.setLow24H(row.low24h); + if (row.volume24h != null) tsBuilder.setVolume24H(row.volume24h); + if (row.numTrades != null) tsBuilder.setNumTrades(row.numTrades); + + Double usdRate = usdRates.get(row.quoteAsset); + if (usdRate != null) { + tsBuilder.setStdQuoteVolume(row.quoteVolume24h * usdRate); + } + + ticker24hBuilder.addTickers(tsBuilder.build()); + } + + byte[] protoBytes = ticker24hBuilder.build().toByteArray(); + + String clientId = batch.getClientId(); + String topic = (clientId != null && !clientId.isEmpty()) + ? "RESPONSE:" + clientId + : exchangeId + "|ticker24h"; + + LOG.info("Built Ticker24h snapshot: exchange={}, tickers={}, bytes={}, topic={}", + exchangeId, rows.size(), protoBytes.length, topic); + + out.collect(new Ticker24hWrapper(exchangeId, topic, protoBytes)); + } + + /** + * Build a USD rate for a quote currency. + * Returns null if no conversion path is known (fiat, or crypto with no available pair). + */ + private QuoteCurrencyRate buildRate(String currency, long timestampNs) { + if (USD_STABLECOINS.contains(currency)) { + return QuoteCurrencyRate.newBuilder() + .setCurrency(currency) + .setUsdRate(1.0) + .setSourceTicker("hardcoded") + .setTimestamp(timestampNs) + .build(); + } + + // Try priority exchanges first, then any remaining exchange + List orderedExchanges = new ArrayList<>(EXCHANGE_PRIORITY); + for (String ex : exchangePriceIndex.keySet()) { + if (!orderedExchanges.contains(ex)) { + orderedExchanges.add(ex); + } + } + + for (String exchange : orderedExchanges) { + Map priceMap = exchangePriceIndex.get(exchange); + if (priceMap == null) continue; + + for (String stablecoin : Arrays.asList("USDT", "USDC")) { + String pairTicker = currency + "/" + stablecoin + "." + exchange; + Double price = priceMap.get(pairTicker); + if (price != null && price > 0) { + return QuoteCurrencyRate.newBuilder() + .setCurrency(currency) + .setUsdRate(price) + .setSourceTicker(pairTicker) + .setTimestamp(timestampNs) + .build(); + } + } + } + + LOG.debug("No USD conversion path for quote currency: {}", currency); + return null; + } +} diff --git a/flink/src/main/java/com/dexorder/flink/quotes/Ticker24hPublisher.java b/flink/src/main/java/com/dexorder/flink/quotes/Ticker24hPublisher.java new file mode 100644 index 00000000..7f88054c --- /dev/null +++ b/flink/src/main/java/com/dexorder/flink/quotes/Ticker24hPublisher.java @@ -0,0 +1,78 @@ +package com.dexorder.flink.quotes; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.zeromq.SocketType; +import org.zeromq.ZContext; +import org.zeromq.ZMQ; + +/** + * Flink sink that publishes Ticker24h snapshots to subscribers via ZMQ. + * + * Connects a ZMQ PUSH socket to the job manager's notification PULL endpoint. + * HistoryNotificationForwarder receives these frames and enqueues them to + * RealtimeSubscriptionManager, which publishes them on the MARKET_DATA_PUB XPUB socket. + * Clients subscribed to "{exchange_id}|ticker24h" receive the snapshot. + * + * Wire format (matches other notification publishers): + * Frame 1: topic bytes (e.g., "BINANCE|ticker24h") + * Frame 2: [0x01] (protocol version) + * Frame 3: [0x0D][Ticker24h protobuf bytes] (type 0x0D = TICKER_24H) + * + * Parallelism MUST be 1. + */ +public class Ticker24hPublisher extends RichSinkFunction { + private static final Logger LOG = LoggerFactory.getLogger(Ticker24hPublisher.class); + private static final long serialVersionUID = 1L; + + private static final byte PROTOCOL_VERSION = 0x01; + private static final byte MSG_TYPE_TICKER_24H = 0x0D; + + private final String jobManagerPullEndpoint; + + private transient ZContext context; + private transient ZMQ.Socket pushSocket; + + public Ticker24hPublisher(String jobManagerPullEndpoint) { + this.jobManagerPullEndpoint = jobManagerPullEndpoint; + } + + @Override + public void open(Configuration parameters) { + context = new ZContext(); + pushSocket = context.createSocket(SocketType.PUSH); + pushSocket.setLinger(1000); + pushSocket.setSndHWM(10000); + pushSocket.connect(jobManagerPullEndpoint); + LOG.info("Ticker24hPublisher PUSH connected to {}", jobManagerPullEndpoint); + } + + @Override + public void invoke(Ticker24hWrapper wrapper, Context context) { + try { + byte[] protoBytes = wrapper.getProtoBytes(); + byte[] messageFrame = new byte[1 + protoBytes.length]; + messageFrame[0] = MSG_TYPE_TICKER_24H; + System.arraycopy(protoBytes, 0, messageFrame, 1, protoBytes.length); + + String topic = wrapper.getZmqTopic(); + pushSocket.sendMore(topic.getBytes(ZMQ.CHARSET)); + pushSocket.sendMore(new byte[]{PROTOCOL_VERSION}); + pushSocket.send(messageFrame, 0); + + LOG.info("Published Ticker24h snapshot: topic={}, bytes={}", topic, protoBytes.length); + + } catch (Exception e) { + LOG.error("Failed to publish Ticker24h: exchange={}", wrapper.getExchangeId(), e); + } + } + + @Override + public void close() { + if (pushSocket != null) pushSocket.close(); + if (context != null) context.close(); + LOG.info("Ticker24hPublisher closed"); + } +} diff --git a/flink/src/main/java/com/dexorder/flink/quotes/Ticker24hScheduler.java b/flink/src/main/java/com/dexorder/flink/quotes/Ticker24hScheduler.java new file mode 100644 index 00000000..7fc3861f --- /dev/null +++ b/flink/src/main/java/com/dexorder/flink/quotes/Ticker24hScheduler.java @@ -0,0 +1,71 @@ +package com.dexorder.flink.quotes; + +import com.dexorder.flink.ingestor.IngestorBroker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Schedules periodic TICKER_SNAPSHOT requests for all configured exchanges. + * + * Fires immediately on startup, then at the top of each hour. + * The IngestorBroker dispatches the requests to ingestor workers, which call + * fetchTickers() and publish TickerBatch messages to the market-ticker Kafka topic. + */ +public class Ticker24hScheduler { + private static final Logger LOG = LoggerFactory.getLogger(Ticker24hScheduler.class); + + private final IngestorBroker broker; + private final List exchanges; + private final ScheduledExecutorService scheduler; + + public Ticker24hScheduler(IngestorBroker broker, List exchanges) { + this.broker = broker; + this.exchanges = exchanges; + this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "Ticker24hScheduler"); + t.setDaemon(true); + return t; + }); + } + + public void start() { + // Fire immediately for all exchanges + fireAll(); + + // Schedule next firing at top of next hour, then every hour after that + long delayMs = msUntilNextHour(); + scheduler.scheduleAtFixedRate(this::fireAll, delayMs, 3_600_000L, TimeUnit.MILLISECONDS); + + long delayMin = delayMs / 60_000; + LOG.info("Ticker24hScheduler started: fired immediately for {}, next firing in ~{}min", + exchanges, delayMin); + } + + public void stop() { + scheduler.shutdownNow(); + LOG.info("Ticker24hScheduler stopped"); + } + + private void fireAll() { + LOG.info("Ticker24hScheduler firing TICKER_SNAPSHOT requests for exchanges: {}", exchanges); + for (String exchange : exchanges) { + try { + broker.submitTicker24hRequest(exchange); + } catch (Exception e) { + LOG.error("Failed to submit TICKER_SNAPSHOT for exchange={}", exchange, e); + } + } + } + + /** Milliseconds until the next full-hour boundary (e.g., 14:00:00.000). */ + private static long msUntilNextHour() { + long now = System.currentTimeMillis(); + long nextHour = (now / 3_600_000L + 1) * 3_600_000L; + return nextHour - now; + } +} diff --git a/flink/src/main/java/com/dexorder/flink/quotes/Ticker24hWrapper.java b/flink/src/main/java/com/dexorder/flink/quotes/Ticker24hWrapper.java new file mode 100644 index 00000000..3c3c9405 --- /dev/null +++ b/flink/src/main/java/com/dexorder/flink/quotes/Ticker24hWrapper.java @@ -0,0 +1,24 @@ +package com.dexorder.flink.quotes; + +import java.io.Serializable; + +/** + * Wrapper for a serialized Ticker24h proto message, ready for ZMQ publication. + */ +public class Ticker24hWrapper implements Serializable { + private static final long serialVersionUID = 1L; + + private final String exchangeId; + private final String zmqTopic; // "RESPONSE:{clientId}" or "{exchange}|ticker24h" + private final byte[] protoBytes; // serialized Ticker24h proto + + public Ticker24hWrapper(String exchangeId, String zmqTopic, byte[] protoBytes) { + this.exchangeId = exchangeId; + this.zmqTopic = zmqTopic; + this.protoBytes = protoBytes; + } + + public String getExchangeId() { return exchangeId; } + public String getZmqTopic() { return zmqTopic; } + public byte[] getProtoBytes() { return protoBytes; } +} diff --git a/flink/src/main/java/com/dexorder/flink/quotes/TickerBatchDeserializer.java b/flink/src/main/java/com/dexorder/flink/quotes/TickerBatchDeserializer.java new file mode 100644 index 00000000..202ead5c --- /dev/null +++ b/flink/src/main/java/com/dexorder/flink/quotes/TickerBatchDeserializer.java @@ -0,0 +1,95 @@ +package com.dexorder.flink.quotes; + +import com.dexorder.proto.TickerBatch; +import com.dexorder.proto.TickerStats; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Kafka deserializer for TickerBatch protobuf messages from the market-ticker topic. + * Wire format: [0x01 version][0x0C type][protobuf bytes] + */ +public class TickerBatchDeserializer implements DeserializationSchema { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(TickerBatchDeserializer.class); + + private static final byte PROTOCOL_VERSION = 0x01; + private static final byte MSG_TYPE_TICKER_BATCH = 0x0C; + + @Override + public TickerBatchWrapper deserialize(byte[] message) throws IOException { + try { + if (message.length < 2) { + throw new IOException("Message too short: " + message.length + " bytes"); + } + + byte version = message[0]; + if (version != PROTOCOL_VERSION) { + throw new IOException("Unsupported protocol version: " + version); + } + + byte messageType = message[1]; + if (messageType != MSG_TYPE_TICKER_BATCH) { + throw new IOException("Unexpected message type: 0x" + Integer.toHexString(messageType & 0xFF)); + } + + byte[] protoPayload = new byte[message.length - 2]; + System.arraycopy(message, 2, protoPayload, 0, protoPayload.length); + + TickerBatchWrapper wrapper = parseTickerBatch(protoPayload); + LOG.debug("Deserialized TickerBatch: exchange={}, tickers={}", + wrapper.getExchangeId(), wrapper.getTickerCount()); + return wrapper; + + } catch (Exception e) { + LOG.error("Failed to deserialize TickerBatch", e); + throw new IOException("Failed to deserialize TickerBatch", e); + } + } + + private TickerBatchWrapper parseTickerBatch(byte[] payload) throws Exception { + TickerBatch batch = TickerBatch.parseFrom(payload); + + List rows = new ArrayList<>(batch.getTickersCount()); + for (TickerStats ts : batch.getTickersList()) { + rows.add(new TickerBatchWrapper.TickerStatsRow( + ts.getTicker(), + ts.getExchangeId(), + ts.getBaseAsset(), + ts.getQuoteAsset(), + ts.getLastPrice(), + ts.getPriceChangePct(), + ts.getQuoteVolume24H(), + ts.getTimestamp(), + ts.hasBidPrice() ? ts.getBidPrice() : null, + ts.hasAskPrice() ? ts.getAskPrice() : null, + ts.hasOpen24H() ? ts.getOpen24H() : null, + ts.hasHigh24H() ? ts.getHigh24H() : null, + ts.hasLow24H() ? ts.getLow24H() : null, + ts.hasVolume24H() ? ts.getVolume24H() : null, + ts.hasNumTrades() ? ts.getNumTrades() : null + )); + } + + return new TickerBatchWrapper( + batch.getExchangeId(), rows, batch.getFetchedAt(), + batch.hasClientId() ? batch.getClientId() : "", + batch.hasRequestId() ? batch.getRequestId() : ""); + } + + @Override + public boolean isEndOfStream(TickerBatchWrapper nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(TickerBatchWrapper.class); + } +} diff --git a/flink/src/main/java/com/dexorder/flink/quotes/TickerBatchWrapper.java b/flink/src/main/java/com/dexorder/flink/quotes/TickerBatchWrapper.java new file mode 100644 index 00000000..38dff8a6 --- /dev/null +++ b/flink/src/main/java/com/dexorder/flink/quotes/TickerBatchWrapper.java @@ -0,0 +1,86 @@ +package com.dexorder.flink.quotes; + +import java.io.Serializable; +import java.util.List; + +/** + * POJO wrapper for TickerBatch Kafka messages from market-ticker topic. + * Unwraps the protobuf into plain Java fields for Flink processing. + */ +public class TickerBatchWrapper implements Serializable { + private static final long serialVersionUID = 1L; + + private final String exchangeId; + private final List tickers; + private final long fetchedAt; // nanoseconds + private final String clientId; // non-empty = client-initiated; "" = scheduled broadcast + private final String requestId; // echoed for tracing + + public TickerBatchWrapper(String exchangeId, List tickers, long fetchedAt, + String clientId, String requestId) { + this.exchangeId = exchangeId; + this.tickers = tickers; + this.fetchedAt = fetchedAt; + this.clientId = clientId != null ? clientId : ""; + this.requestId = requestId != null ? requestId : ""; + } + + public String getExchangeId() { return exchangeId; } + public List getTickers() { return tickers; } + public long getFetchedAt() { return fetchedAt; } + public String getClientId() { return clientId; } + public String getRequestId() { return requestId; } + public int getTickerCount() { return tickers != null ? tickers.size() : 0; } + + @Override + public String toString() { + return "TickerBatchWrapper{exchangeId='" + exchangeId + "', count=" + getTickerCount() + '}'; + } + + /** + * Single ticker stats row. Optional fields are null when the exchange did not provide them. + */ + public static class TickerStatsRow implements Serializable { + private static final long serialVersionUID = 1L; + + public final String ticker; + public final String exchangeId; + public final String baseAsset; + public final String quoteAsset; + public final double lastPrice; + public final double priceChangePct; + public final double quoteVolume24h; + public final long timestamp; // nanoseconds + // Optional fields — null if not provided by exchange + public final Double bidPrice; + public final Double askPrice; + public final Double open24h; + public final Double high24h; + public final Double low24h; + public final Double volume24h; + public final Integer numTrades; + + public TickerStatsRow( + String ticker, String exchangeId, String baseAsset, String quoteAsset, + double lastPrice, double priceChangePct, double quoteVolume24h, long timestamp, + Double bidPrice, Double askPrice, + Double open24h, Double high24h, Double low24h, Double volume24h, + Integer numTrades) { + this.ticker = ticker; + this.exchangeId = exchangeId; + this.baseAsset = baseAsset; + this.quoteAsset = quoteAsset; + this.lastPrice = lastPrice; + this.priceChangePct = priceChangePct; + this.quoteVolume24h = quoteVolume24h; + this.timestamp = timestamp; + this.bidPrice = bidPrice; + this.askPrice = askPrice; + this.open24h = open24h; + this.high24h = high24h; + this.low24h = low24h; + this.volume24h = volume24h; + this.numTrades = numTrades; + } + } +} diff --git a/flink/src/main/resources/topics.yaml b/flink/src/main/resources/topics.yaml index ad190ea2..0ee349a0 100644 --- a/flink/src/main/resources/topics.yaml +++ b/flink/src/main/resources/topics.yaml @@ -28,6 +28,16 @@ topics: compression.type: snappy cleanup.policy: delete + # 24-hour rolling ticker snapshots for all symbols on an exchange. + # Written by ingestors on TICKER_SNAPSHOT requests; consumed by Ticker24hConsumer. + - name: market-ticker + partitions: 3 + replication: 2 + config: + retention.ms: 7200000 # 2 hours (hourly refresh; keep one backup) + compression.type: snappy + cleanup.policy: delete + # Symbol metadata from ingestors - name: symbol-metadata partitions: 3 diff --git a/gateway/knowledge/api-reference.md b/gateway/knowledge/api-reference.md index 9b1c97c9..8d72f285 100644 --- a/gateway/knowledge/api-reference.md +++ b/gateway/knowledge/api-reference.md @@ -247,6 +247,63 @@ class DataAPI(ABC): """ pass + @abstractmethod + async def get_ticker_24h( + self, + exchange: str, + limit: Optional[int] = None, + min_std_quote_volume: Optional[float] = None, + market_type: Optional[str] = None, + base_asset_contains: Optional[str] = None, + ) -> pd.DataFrame: + """ + Retrieve 24h rolling market stats for all symbols on an exchange. + + Data is refreshed hourly by the ingestor pipeline. Use this to build a + pre-filtered symbol universe before running a scanner — it avoids requesting + per-symbol OHLC data for thousands of symbols. + + Args: + exchange: Exchange name (e.g., "BINANCE", "COINBASE", "KRAKEN") + limit: If set, return only the Top N symbols By Volume. None = return all. + min_std_quote_volume: Exclude symbols with USD volume below this threshold. + market_type: Filter by market type: "spot" or "perp". None = return all. + base_asset_contains: Filter to symbols whose base asset contains this string + (case-insensitive). E.g., "BTC" matches "BTC/USDT". + + Returns: + DataFrame sorted by std_quote_volume descending (NULLs last). Columns: + - ticker: Full ticker (e.g., "BTC/USDT.BINANCE") + - exchange_id: Exchange name + - base_asset: Base currency (e.g., "BTC") + - quote_asset: Quote currency (e.g., "USDT") + - last_price: Last traded price in quote currency + - price_change_pct: 24h price change as percentage (e.g. 2.5 = +2.5%) + - quote_volume_24h: Raw 24h volume in quote asset + - std_quote_volume: quote_volume_24h normalized to USD (NaN if conversion unknown) + - bid_price, ask_price: Current best bid/ask (NaN if not provided by exchange) + - open_24h, high_24h, low_24h: 24h OHLC prices (NaN if not provided) + - volume_24h: Base-asset volume (NaN if not provided) + - num_trades: 24h trade count (NaN if not provided) + - timestamp_ms: Snapshot timestamp in milliseconds + + Returns empty DataFrame if no data is available (e.g., not yet fetched). + + Examples: + # Top 50 most liquid Binance spot symbols + df = await api.data.get_ticker_24h("BINANCE", limit=50, market_type="spot") + + # All BTC pairs with at least $10M daily volume + df = await api.data.get_ticker_24h("BINANCE", + base_asset_contains="BTC", + min_std_quote_volume=10_000_000) + + # Build a scanner universe: all Binance symbols, sorted by volume + universe = await api.data.get_ticker_24h("BINANCE") + top_100 = universe.head(100)["ticker"].tolist() + """ + pass + ``` diff --git a/gateway/knowledge/usage-examples.md b/gateway/knowledge/usage-examples.md index 562d5261..6d52e29f 100644 --- a/gateway/knowledge/usage-examples.md +++ b/gateway/knowledge/usage-examples.md @@ -92,6 +92,61 @@ All columns below are fully populated for Binance data. Other exchanges provide - `"ticker"` - Market identifier - `"period_seconds"` - Period in seconds +## Building a Scanner Universe with get_ticker_24h + +**Always pre-filter symbols before fetching OHLC data for scanners.** Fetching OHLC for all ~1800 Binance symbols would exhaust the 2M-bar per-script budget instantly. Use `get_ticker_24h` to get a ranked list of all symbols for free (no OHLC budget cost), then run per-symbol analysis only on the filtered set. + +```python +from dexorder.api import get_api +import asyncio + +api = get_api() + +# Get top 50 most liquid Binance spot symbols (no OHLC budget used) +universe = asyncio.run(api.data.get_ticker_24h( + "BINANCE", + limit=50, + market_type="spot", + min_std_quote_volume=10_000_000 # $10M+ daily volume +)) +print(f"Universe: {len(universe)} symbols") +print(universe[["ticker", "std_quote_volume", "price_change_pct"]].head(10)) + +# Now fetch OHLC only for these symbols +tickers = universe["ticker"].tolist() +results = {} +for ticker in tickers: + df = asyncio.run(api.data.historical_ohlc( + ticker=ticker, + period_seconds=3600, + start_time="2024-01-01", + end_time="2025-01-01", + extra_columns=["volume"] + )) + print(f"[Data] {ticker}: {len(df)} bars") + results[ticker] = df +``` + +### get_ticker_24h filter parameters + +```python +# All BTC pairs on Binance (spot + perp) +df = asyncio.run(api.data.get_ticker_24h("BINANCE", base_asset_contains="BTC")) + +# Top 100 perp markets with at least $50M daily volume +df = asyncio.run(api.data.get_ticker_24h( + "BINANCE", + limit=100, + market_type="perp", + min_std_quote_volume=50_000_000 +)) + +# All Coinbase symbols (for a cross-exchange scan) +df = asyncio.run(api.data.get_ticker_24h("COINBASE")) +``` + +The returned DataFrame is sorted by `std_quote_volume` (USD-normalized volume) descending. Symbols without a USD conversion path have `std_quote_volume = NaN` and appear last. Columns: `ticker`, `exchange_id`, `base_asset`, `quote_asset`, `last_price`, `price_change_pct`, `quote_volume_24h`, `std_quote_volume`, `bid_price`, `ask_price`, `open_24h`, `high_24h`, `low_24h`, `volume_24h`, `num_trades`, `timestamp_ms`. + ## Using the Charting API The charting API provides styled financial charts with OHLC candlesticks and technical indicators. diff --git a/gateway/prompt/agent-main.md b/gateway/prompt/agent-main.md index 10379a62..2933d83b 100644 --- a/gateway/prompt/agent-main.md +++ b/gateway/prompt/agent-main.md @@ -89,7 +89,7 @@ After patching, confirm the change to the user. ## Symbol Resolution -Always use `SymbolLookup` to resolve tickers before passing them to research or chart tools. Symbols must be in `SYMBOL.EXCHANGE` format (e.g., `BTC/USDT.BINANCE`). If the user says "ETHUSDT", "ETH", or any ambiguous ticker, resolve it first. If not specified by the user, prefer to use the most prominent exchange available (e.g. BINANCE not KRAKEN) +Always use `SymbolLookup` to resolve tickers before passing them to research or chart tools. Symbols must be in `SYMBOL.EXCHANGE` format (e.g., `BTC/USDT.BINANCE`). If the user says "ETHUSDT", "ETH", or any ambiguous ticker, resolve it first. `SymbolLookup` results are sorted by 24h volume descending — pick the top result when the user hasn't specified an exchange. ## Raw Data Retrieval diff --git a/gateway/prompt/agent-research.md b/gateway/prompt/agent-research.md index 46ca5be3..a68fa9a8 100644 --- a/gateway/prompt/agent-research.md +++ b/gateway/prompt/agent-research.md @@ -66,6 +66,27 @@ Quick reference — approximate bars per resolution at various windows: **When to shorten the window**: only if 5 years at the chosen resolution would far exceed 200,000 bars (e.g., 5m over 5 years ≈ 525k → shorten to ~2 years). Otherwise always use the full 5 years. +## Multi-Symbol Analysis + +When scanning many symbols, scale the per-symbol time window so total bars stay within the **2,000,000-bar script limit**. The API enforces this — exceeding it raises a `ValueError` with the limit number and suggestions. + +Budget rule: `bars_per_symbol ≈ 2,000,000 / num_symbols` (never exceed 200,000 per symbol) + +| Symbol count | Recommended period | Approx max window | +|---|---|---| +| ≤ 10 | any | 5 years | +| 10–100 | 1h or coarser | scale to budget | +| 100–500 | 1d (86400s) | ~1–2 years | +| 500+ | 1d (86400s) | ≤ 1 year | + +**Strategy for large symbol lists**: +1. **Filter first**: scan all symbols with a short window (90–180 days, daily bars) to rank/screen candidates +2. **Zoom in**: fetch full history only for the top N (≤ 20) finalists +3. **Never use intraday periods for > 50 symbols** in one script +4. **Print progress** every 50 symbols so the output log shows the script is alive + +If you hit a `ValueError` about the bar budget, read the limit and suggestions in the error message, then adjust the period or window accordingly. + ## Tool Behavior Notes - **`PythonWrite` / `PythonEdit` for research**: auto-executes the script and returns all output (stdout, stderr) and captured images. **Do not call `ExecuteResearch` afterward** — the script has already run. @@ -89,6 +110,30 @@ The API provides two main components: See the knowledge base sections below for complete API documentation, examples, and the full pandas-ta indicator reference. +### Scanner Pre-filtering with get_ticker_24h + +**Before fetching OHLC data for multiple symbols, always build a pre-filtered universe first.** + +Scanners must not blindly fetch OHLC for all symbols on an exchange — Binance has ~1800 symbols and the script budget is 2M bars total. Use `api.data.get_ticker_24h()` to get a ranked, filterable list of all symbols without consuming any OHLC budget: + +```python +# Get top 50 most liquid Binance spot symbols by USD volume +universe = asyncio.run(api.data.get_ticker_24h( + "BINANCE", + limit=50, + market_type="spot", + min_std_quote_volume=10_000_000 # $10M+ daily volume +)) +tickers = universe["ticker"].tolist() +print(f"Universe: {len(tickers)} symbols") + +# Now fetch OHLC only for these symbols +for ticker in tickers: + df = asyncio.run(api.data.historical_ohlc(ticker, period_seconds=3600, ...)) +``` + +`get_ticker_24h` returns a DataFrame sorted by `std_quote_volume` (USD-normalized) descending, with columns: `ticker`, `exchange_id`, `base_asset`, `quote_asset`, `last_price`, `price_change_pct`, `quote_volume_24h`, `std_quote_volume`, `bid_price`, `ask_price`, `open_24h`, `high_24h`, `low_24h`, `volume_24h`, `num_trades`, `timestamp_ms`. See the full docstring in the knowledge base `api-reference.md`. + ## Technical Indicators — pandas-ta Use `import pandas_ta as ta` for all indicator calculations. Never write manual rolling/ewm implementations. The full indicator catalog, calling conventions, column naming patterns, and default parameters are in the pandas-ta-reference section of your knowledge base. diff --git a/gateway/prompt/tools.md b/gateway/prompt/tools.md index 0ba0c81f..07192548 100644 --- a/gateway/prompt/tools.md +++ b/gateway/prompt/tools.md @@ -22,6 +22,7 @@ Available to all agents: | `PythonList` | List existing scripts by category (`strategy`, `indicator`, or `research`) | | `SymbolLookup` | Resolve a ticker to the correct `SYMBOL.EXCHANGE` format | | `GetChartData` | Fetch raw OHLC data (casual retrieval only — use `Spawn` research for analysis) | +| `GetTicker24h` | Fetch 24h market stats for all symbols on an exchange, sorted by USD volume — use this to build scanner universes without burning OHLC bar budget | | `WebSearch` | Search the web (Tavily) | | `FetchPage` | Fetch and read a web page or PDF | | `ArxivSearch` | Search arXiv for academic papers | diff --git a/gateway/src/channels/websocket-handler.ts b/gateway/src/channels/websocket-handler.ts index 1f5e311e..67532bde 100644 --- a/gateway/src/channels/websocket-handler.ts +++ b/gateway/src/channels/websocket-handler.ts @@ -57,6 +57,7 @@ interface BarSubscription { ticker: string; periodSeconds: number; callback: BarUpdateCallback; + openBars: boolean; } export class WebSocketHandler { @@ -65,6 +66,8 @@ export class WebSocketHandler { private workspaces = new Map(); /** Per-session realtime bar subscriptions for cleanup on disconnect */ private barSubscriptions = new Map(); + /** "sessionId:pandas_ta_name" → active request_id; supersedes stale requests on scroll */ + private activeEvaluations = new Map(); constructor(config: WebSocketHandlerConfig) { this.config = config; @@ -501,13 +504,19 @@ export class WebSocketHandler { const sessionId = authContext.sessionId; const subs = this.barSubscriptions.get(sessionId); if (subs && this.config.ohlcService) { - for (const { ticker, periodSeconds, callback } of subs) { - this.config.ohlcService.unsubscribeFromTicker(ticker, periodSeconds, callback); + for (const { ticker, periodSeconds, callback, openBars } of subs) { + this.config.ohlcService.unsubscribeFromTicker(ticker, periodSeconds, callback, openBars); } this.barSubscriptions.delete(sessionId); logger.info({ sessionId, count: subs.length }, 'Cleaned up realtime bar subscriptions'); } + // Cleanup active indicator evaluations for this session + const evalPrefix = `${sessionId}:`; + for (const key of this.activeEvaluations.keys()) { + if (key.startsWith(evalPrefix)) this.activeEvaluations.delete(key); + } + // Cleanup workspace await workspace!.shutdown(); this.workspaces.delete(authContext.sessionId); @@ -623,18 +632,11 @@ export class WebSocketHandler { case 'search_symbols': { logger.info({ query: payload.query, limit: payload.limit }, 'Handling search_symbols'); - // Use SymbolIndexService if available, otherwise fallback to OHLCService stub const symbolIndexService = this.config.symbolIndexService; - logger.info({ hasSymbolIndexService: !!symbolIndexService }, 'Service check for search'); const results = symbolIndexService ? await symbolIndexService.search(payload.query, payload.limit || 30) - : (ohlcService ? await ohlcService.searchSymbols( - payload.query, - payload.symbol_type, - payload.exchange, - payload.limit || 30 - ) : []); + : []; logger.info({ resultsCount: results.length }, 'Search complete'); socket.send( @@ -649,13 +651,11 @@ export class WebSocketHandler { case 'resolve_symbol': { logger.info({ symbol: payload.symbol }, 'Handling resolve_symbol'); - // Use SymbolIndexService if available, otherwise fallback to OHLCService stub const symbolIndexService = this.config.symbolIndexService; - logger.info({ hasSymbolIndexService: !!symbolIndexService }, 'Service check for resolve'); const symbolInfo = symbolIndexService ? await symbolIndexService.resolveSymbol(payload.symbol) - : (ohlcService ? await ohlcService.resolveSymbol(payload.symbol) : null); + : null; logger.info({ found: !!symbolInfo }, 'Symbol resolution complete'); @@ -723,6 +723,8 @@ export class WebSocketHandler { const subTicker: string = payload.symbol; const subPeriod: number = payload.period_seconds ?? payload.resolution ?? 60; + // 'open' = in-progress bar snapshots every tick (chart); 'closed' = completed bars only (strategies) + const openBars: boolean = (payload.bar_type ?? 'open') === 'open'; const sessionId = authContext.sessionId; // Create a per-subscription callback that forwards bars to this socket @@ -733,6 +735,7 @@ export class WebSocketHandler { subscription_id: payload.subscription_id, ticker: bar.ticker, period_seconds: bar.periodSeconds, + is_closed: bar.isClosed, bar: { // Convert nanoseconds → seconds for client compatibility time: Number(bar.timestamp / 1_000_000_000n), @@ -745,7 +748,7 @@ export class WebSocketHandler { })); }; - ohlcService.subscribeToTicker(subTicker, subPeriod, barCallback); + ohlcService.subscribeToTicker(subTicker, subPeriod, barCallback, openBars); // Track for cleanup on disconnect if (!this.barSubscriptions.has(sessionId)) { @@ -755,6 +758,7 @@ export class WebSocketHandler { ticker: subTicker, periodSeconds: subPeriod, callback: barCallback, + openBars, }); logger.info({ sessionId, ticker: subTicker, period: subPeriod }, 'Subscribed to realtime bars'); @@ -782,7 +786,7 @@ export class WebSocketHandler { ); if (idx >= 0) { const [removed] = subs.splice(idx, 1); - ohlcService.unsubscribeFromTicker(unsubTicker, unsubPeriod, removed.callback); + ohlcService.unsubscribeFromTicker(unsubTicker, unsubPeriod, removed.callback, removed.openBars); logger.info({ sessionId, ticker: unsubTicker, period: unsubPeriod }, 'Unsubscribed from realtime bars'); } } @@ -807,6 +811,19 @@ export class WebSocketHandler { })); break; } + + // Supersede any in-flight request for the same indicator (e.g. rapid scrolling) + const evalKey = `${authContext.sessionId}:${payload.pandas_ta_name}`; + const prevRequestId = this.activeEvaluations.get(evalKey); + if (prevRequestId) { + socket.send(JSON.stringify({ + type: 'evaluate_indicator_result', + request_id: prevRequestId, + error: 'superseded', + })); + } + this.activeEvaluations.set(evalKey, requestId); + try { const mcpResult = await harness.callMcpTool('EvaluateIndicator', { symbol: payload.symbol, @@ -816,6 +833,11 @@ export class WebSocketHandler { pandas_ta_name: payload.pandas_ta_name, parameters: payload.parameters ?? {}, }) as any; + + // Discard result if a newer request arrived while we were awaiting + if (this.activeEvaluations.get(evalKey) !== requestId) break; + this.activeEvaluations.delete(evalKey); + // MCP returns { content: [{type: 'text', text: '...json...'}] } // When the tool raises an exception, the MCP framework sets isError: true // and puts the raw exception text in content[0].text (not JSON-wrapped). @@ -849,6 +871,9 @@ export class WebSocketHandler { ...data, })); } catch (err: any) { + if (this.activeEvaluations.get(evalKey) === requestId) { + this.activeEvaluations.delete(evalKey); + } logger.error({ err: err?.message, pandas_ta_name: payload.pandas_ta_name }, 'evaluate_indicator handler error'); socket.send(JSON.stringify({ type: 'evaluate_indicator_result', diff --git a/gateway/src/clients/zmq-protocol.ts b/gateway/src/clients/zmq-protocol.ts index 32b9fc9c..393565fd 100644 --- a/gateway/src/clients/zmq-protocol.ts +++ b/gateway/src/clients/zmq-protocol.ts @@ -20,11 +20,15 @@ import type { NotificationStatus, } from '../types/ohlc.js'; -export const OHLC_BAR_TOPIC_PATTERN = /^(.+)\|ohlc:(\d+)$/; +// Matches both "{ticker}|ohlc:{period}" (closed) and "{ticker}|ohlc:{period}:open" (open bar) +export const OHLC_BAR_TOPIC_PATTERN = /^(.+)\|ohlc:(\d+)(:open)?$/; + +// Matches "{exchange_id}|ticker24h" (e.g., "BINANCE|ticker24h") +export const TICKER24H_TOPIC_PATTERN = /^([A-Z0-9]+)\|ticker24h$/; /** Decoded realtime OHLC bar received from the XPUB market data stream */ export interface RealtimeBar { - topic: string; // e.g., "BTC/USDT.BINANCE|ohlc:60" + topic: string; // e.g., "BTC/USDT.BINANCE|ohlc:60" or "BTC/USDT.BINANCE|ohlc:60:open" ticker: string; // e.g., "BTC/USDT.BINANCE" periodSeconds: number; /** Window open time in nanoseconds since epoch */ @@ -34,6 +38,35 @@ export interface RealtimeBar { low: number; close: number; volume: number; + /** True if this bar's time window has fully closed (strategies); false if still accumulating (chart). */ + isClosed: boolean; +} + +/** Single ticker 24h stats from a Ticker24h snapshot */ +export interface Ticker24hStats { + ticker: string; + exchange_id: string; + base_asset: string; + quote_asset: string; + last_price: number; + price_change_pct: number; + quote_volume_24h: number; + timestamp_ms: number; // milliseconds (converted from nanoseconds) + bid_price?: number; + ask_price?: number; + open_24h?: number; + high_24h?: number; + low_24h?: number; + volume_24h?: number; + std_quote_volume?: number; // null if conversion unknown + num_trades?: number; +} + +/** Decoded Ticker24h snapshot received from the XPUB market data stream */ +export interface Ticker24hSnapshot { + exchange_id: string; + tickers: Ticker24hStats[]; + generated_at_ms: number; // milliseconds (converted from nanoseconds) } const __filename = fileURLToPath(import.meta.url); @@ -58,14 +91,17 @@ const root = new protobuf.Root(); // Load proto files const ingestorProto = readFileSync(join(protoDir, 'ingestor.proto'), 'utf8'); const ohlcProto = readFileSync(join(protoDir, 'ohlc.proto'), 'utf8'); +const ticker24hProto = readFileSync(join(protoDir, 'ticker24h.proto'), 'utf8'); protobuf.parse(ingestorProto, root); protobuf.parse(ohlcProto, root); +protobuf.parse(ticker24hProto, root); // Export message types const SubmitHistoricalRequestType = root.lookupType('SubmitHistoricalRequest'); const SubmitResponseType = root.lookupType('SubmitResponse'); const HistoryReadyNotificationType = root.lookupType('HistoryReadyNotification'); const OHLCType = root.lookupType('OHLC'); +const Ticker24hType = root.lookupType('Ticker24h'); /** * Encode SubmitHistoricalRequest to ZMQ frames @@ -198,6 +234,62 @@ export function decodeHistoryReadyNotification(frames: Buffer[]): HistoryReadyNo }; } +/** + * Decode a Ticker24h snapshot from ZMQ SUB frames. + * Frame layout: [topic][version][0x0D Ticker24h type + Ticker24h protobuf bytes] + * + * Returns null if the topic doesn't match the ticker24h pattern or decoding fails. + */ +export function decodeTicker24h(frames: Buffer[]): Ticker24hSnapshot | null { + if (frames.length < 3) return null; + + const topic = frames[0].toString(); + const match = TICKER24H_TOPIC_PATTERN.exec(topic); + if (!match) return null; + + const messageFrame = frames[2]; + if (messageFrame[0] !== 0x0D) return null; // Must be TICKER_24H type + + try { + const payloadBuffer = messageFrame.slice(1); + const decoded = Ticker24hType.decode(payloadBuffer); + const snapshot = Ticker24hType.toObject(decoded, { + longs: String, + defaults: false, + }); + + const tickers: Ticker24hStats[] = (snapshot.tickers ?? []).map((ts: any) => { + const row: Ticker24hStats = { + ticker: ts.ticker ?? '', + exchange_id: ts.exchangeId ?? '', + base_asset: ts.baseAsset ?? '', + quote_asset: ts.quoteAsset ?? '', + last_price: Number(ts.lastPrice ?? 0), + price_change_pct: Number(ts.priceChangePct ?? 0), + quote_volume_24h: Number(ts.quoteVolume24H ?? 0), + timestamp_ms: Math.round(Number(BigInt(ts.timestamp ?? '0')) / 1e6), + }; + if (ts.bidPrice != null) row.bid_price = Number(ts.bidPrice); + if (ts.askPrice != null) row.ask_price = Number(ts.askPrice); + if (ts.open24H != null) row.open_24h = Number(ts.open24H); + if (ts.high24H != null) row.high_24h = Number(ts.high24H); + if (ts.low24H != null) row.low_24h = Number(ts.low24H); + if (ts.volume24H != null) row.volume_24h = Number(ts.volume24H); + if (ts.stdQuoteVolume != null) row.std_quote_volume = Number(ts.stdQuoteVolume); + if (ts.numTrades != null) row.num_trades = Number(ts.numTrades); + return row; + }); + + return { + exchange_id: snapshot.exchangeId ?? match[1], + tickers, + generated_at_ms: Math.round(Number(BigInt(snapshot.generatedAt ?? '0')) / 1e6), + }; + } catch (e) { + return null; + } +} + /** * Decode a realtime OHLC bar from ZMQ SUB frames. * Frame layout: [topic][version][0x04 OHLC type + OHLC protobuf bytes] @@ -213,6 +305,7 @@ export function decodeRealtimeBar(frames: Buffer[]): RealtimeBar | null { const ticker = match[1]; const periodSeconds = parseInt(match[2], 10); + const isClosed = !match[3]; // ":open" suffix absent → closed bar const messageFrame = frames[2]; if (messageFrame[0] !== 0x04) return null; // Must be OHLC type @@ -231,5 +324,6 @@ export function decodeRealtimeBar(frames: Buffer[]): RealtimeBar | null { low: Number(ohlc.low ?? 0), close: Number(ohlc.close ?? 0), volume: Number(ohlc.volume ?? 0), + isClosed, }; } diff --git a/gateway/src/clients/zmq-relay-client.ts b/gateway/src/clients/zmq-relay-client.ts index 0b211de3..49e9984c 100644 --- a/gateway/src/clients/zmq-relay-client.ts +++ b/gateway/src/clients/zmq-relay-client.ts @@ -18,8 +18,11 @@ import { decodeSubmitResponse, decodeHistoryReadyNotification, decodeRealtimeBar, + decodeTicker24h, OHLC_BAR_TOPIC_PATTERN, + TICKER24H_TOPIC_PATTERN, type RealtimeBar, + type Ticker24hSnapshot, } from './zmq-protocol.js'; import type { SubmitHistoricalRequest, @@ -31,7 +34,8 @@ import { } from '../types/ohlc.js'; export type BarUpdateCallback = (bar: RealtimeBar) => void; -export type { RealtimeBar }; +export type Ticker24hCallback = (snapshot: Ticker24hSnapshot) => void; +export type { RealtimeBar, Ticker24hSnapshot }; export interface ZMQRelayConfig { relayRequestEndpoint: string; // e.g., "tcp://relay:5559" @@ -39,6 +43,7 @@ export interface ZMQRelayConfig { clientId?: string; // Optional client ID, will generate if not provided requestTimeout?: number; // Request timeout in ms (default: 120000) onMetadataUpdate?: () => Promise; // Callback when symbol metadata updates + onTicker24h?: (snapshot: Ticker24hSnapshot) => void; // Callback when Ticker24h snapshot arrives } interface PendingRequest { @@ -79,6 +84,7 @@ export class ZMQRelayClient { clientId: config.clientId || `gateway-${randomUUID().slice(0, 8)}`, requestTimeout: config.requestTimeout || 120000, onMetadataUpdate: config.onMetadataUpdate || (async () => {}), + onTicker24h: config.onTicker24h || (() => {}), }; this.logger = logger; this.notificationTopic = `RESPONSE:${this.config.clientId}`; @@ -275,6 +281,17 @@ export class ZMQRelayClient { continue; } + // Handle Ticker24h snapshot updates (topic pattern: "{exchange}|ticker24h") + if (TICKER24H_TOPIC_PATTERN.test(topic)) { + if (this.config.onTicker24h) { + const snapshot = decodeTicker24h(Array.from(frames)); + if (snapshot) { + try { this.config.onTicker24h(snapshot); } catch (e) { /* ignore callback errors */ } + } + } + continue; + } + // Handle realtime OHLC bar updates (topic pattern: "{ticker}|ohlc:{period}") if (OHLC_BAR_TOPIC_PATTERN.test(topic)) { const bar = decodeRealtimeBar(Array.from(frames)); @@ -339,9 +356,12 @@ export class ZMQRelayClient { * This triggers the relay XPUB → Flink subscription detection → ingestor activation. * * @param callback Called whenever a new bar arrives for this topic + * @param openBars If true, subscribe to the ":open" topic (live in-progress bar updates + * for charts). If false (default), subscribe to closed bars only + * (completed candles for strategies/triggers). */ - subscribeToTicker(ticker: string, periodSeconds: number, callback: BarUpdateCallback): void { - const topic = `${ticker}|ohlc:${periodSeconds}`; + subscribeToTicker(ticker: string, periodSeconds: number, callback: BarUpdateCallback, openBars = false): void { + const topic = `${ticker}|ohlc:${periodSeconds}${openBars ? ':open' : ''}`; // Register callback if (!this.barCallbacks.has(topic)) { @@ -361,9 +381,10 @@ export class ZMQRelayClient { /** * Unsubscribe a callback from realtime OHLC bars. * ZMQ unsubscribe is only called on the 1→0 transition (last subscriber). + * @param openBars Must match the value used in the corresponding subscribeToTicker call. */ - unsubscribeFromTicker(ticker: string, periodSeconds: number, callback: BarUpdateCallback): void { - const topic = `${ticker}|ohlc:${periodSeconds}`; + unsubscribeFromTicker(ticker: string, periodSeconds: number, callback: BarUpdateCallback, openBars = false): void { + const topic = `${ticker}|ohlc:${periodSeconds}${openBars ? ':open' : ''}`; const callbacks = this.barCallbacks.get(topic); if (callbacks) { @@ -385,13 +406,25 @@ export class ZMQRelayClient { } } + /** + * Subscribe to Ticker24h snapshots for an exchange. + * Only calls ZMQ subscribe — receipt is handled via the onTicker24h config callback. + */ + subscribeToTicker24h(exchange: string): void { + const topic = `${exchange.toUpperCase()}|ticker24h`; + if (this.subSocket) { + this.subSocket.subscribe(topic); + this.logger.info({ topic }, 'ZMQ subscribed to Ticker24h topic'); + } + } + /** * Remove all subscriptions for a set of (topic, callback) pairs. * Convenience method for WebSocket disconnect cleanup. */ - cleanupSubscriptions(subscriptions: Array<{ ticker: string; periodSeconds: number; callback: BarUpdateCallback }>): void { - for (const { ticker, periodSeconds, callback } of subscriptions) { - this.unsubscribeFromTicker(ticker, periodSeconds, callback); + cleanupSubscriptions(subscriptions: Array<{ ticker: string; periodSeconds: number; callback: BarUpdateCallback; openBars?: boolean }>): void { + for (const { ticker, periodSeconds, callback, openBars } of subscriptions) { + this.unsubscribeFromTicker(ticker, periodSeconds, callback, openBars ?? false); } } diff --git a/gateway/src/harness/spawn/spawn-service.ts b/gateway/src/harness/spawn/spawn-service.ts index 85b763a2..54b86052 100644 --- a/gateway/src/harness/spawn/spawn-service.ts +++ b/gateway/src/harness/spawn/spawn-service.ts @@ -2,7 +2,7 @@ import type { BaseChatModel } from '@langchain/core/language_models/chat_models' import { SystemMessage, HumanMessage } from '@langchain/core/messages'; /** All platform tool names available to every subagent. */ -const ALL_PLATFORM_TOOLS = ['SymbolLookup', 'GetChartData', 'WebSearch', 'FetchPage', 'ArxivSearch']; +const ALL_PLATFORM_TOOLS = ['SymbolLookup', 'GetChartData', 'GetTicker24h', 'WebSearch', 'FetchPage', 'ArxivSearch']; import type { FastifyBaseLogger } from 'fastify'; import { createReactAgent } from '@langchain/langgraph/prebuilt'; import type { HarnessEvent, SubagentChunkEvent, SubagentThinkingEvent } from '../harness-events.js'; diff --git a/gateway/src/harness/tool-labels.ts b/gateway/src/harness/tool-labels.ts index 96e1971c..74ed78d6 100644 --- a/gateway/src/harness/tool-labels.ts +++ b/gateway/src/harness/tool-labels.ts @@ -9,6 +9,7 @@ const TOOL_LABELS: Record = { MemoryLookup: 'Checking docs...', memory_lookup: 'Checking docs...', GetChartData: 'Fetching chart data...', + GetTicker24h: 'Fetching market data...', SymbolLookup: 'Searching symbol...', WebSearch: 'Searching the web...', FetchPage: 'Fetching page...', diff --git a/gateway/src/main.ts b/gateway/src/main.ts index 89e01c87..59c853bf 100644 --- a/gateway/src/main.ts +++ b/gateway/src/main.ts @@ -13,7 +13,7 @@ import { WebSocketHandler } from './channels/websocket-handler.js'; import { TelegramHandler } from './channels/telegram-handler.js'; import { KubernetesClient } from './k8s/client.js'; import { ContainerManager } from './k8s/container-manager.js'; -import { ZMQRelayClient } from './clients/zmq-relay-client.js'; +import { ZMQRelayClient, type Ticker24hSnapshot } from './clients/zmq-relay-client.js'; import { IcebergClient } from './clients/iceberg-client.js'; import { ConversationStore } from './harness/memory/conversation-store.js'; import { BlobStore } from './harness/memory/blob-store.js'; @@ -141,10 +141,13 @@ function loadConfig() { conversationsBucket: configData.iceberg?.conversations_bucket || process.env.CONVERSATIONS_S3_BUCKET, }, - // Relay configuration (for historical data) + // Relay configuration (for historical data and market snapshots) relay: { requestEndpoint: configData.relay?.request_endpoint || process.env.RELAY_REQUEST_ENDPOINT || 'tcp://relay:5559', notificationEndpoint: configData.relay?.notification_endpoint || process.env.RELAY_NOTIFICATION_ENDPOINT || 'tcp://relay:5558', + // Exchanges to subscribe for Ticker24h snapshots + supportedExchanges: (configData.relay?.supported_exchanges || process.env.RELAY_SUPPORTED_EXCHANGES || 'BINANCE') + .split(',').map((s: string) => s.trim().toUpperCase()).filter(Boolean) as string[], }, // Kubernetes configuration @@ -258,12 +261,28 @@ const onMetadataUpdate = async () => { } }; -// Initialize ZMQ Relay client (for historical data) -// Pass onMetadataUpdate callback so it's registered before connection +// Ticker24h in-memory cache (primary serving layer; Redis is warm-start only) +const ticker24hCache = new Map(); + +const TICKER24H_REDIS_TTL = 5400; // 90 minutes + +function onTicker24hReceived(snapshot: Ticker24hSnapshot): void { + const exchange = snapshot.exchange_id; + ticker24hCache.set(exchange, snapshot); + app.log.info({ exchange, count: snapshot.tickers.length }, 'Ticker24h snapshot updated in memory'); + + // Write to Redis as JSON (fire-and-forget warm-start cache) + redis.setex(`ticker24h:${exchange}`, TICKER24H_REDIS_TTL, JSON.stringify(snapshot)) + .catch(err => app.log.warn({ exchange, err }, 'Failed to write Ticker24h to Redis')); +} + +// Initialize ZMQ Relay client (for historical data and market snapshots) +// Pass onMetadataUpdate and onTicker24h callbacks so they're registered before connection const zmqRelayClient = new ZMQRelayClient({ relayRequestEndpoint: config.relay.requestEndpoint, relayNotificationEndpoint: config.relay.notificationEndpoint, onMetadataUpdate, + onTicker24h: onTicker24hReceived, }, app.log); app.log.info({ @@ -478,6 +497,24 @@ try { try { await zmqRelayClient.connect(); app.log.info('ZMQ Relay connected'); + + // Subscribe to Ticker24h topics for all configured exchanges + for (const exchange of config.relay.supportedExchanges) { + zmqRelayClient.subscribeToTicker24h(exchange); + } + + // Warm Ticker24h in-memory cache from Redis (best-effort; gateway ZMQ is the primary source) + for (const exchange of config.relay.supportedExchanges) { + try { + const cached = await redis.get(`ticker24h:${exchange}`); + if (cached) { + ticker24hCache.set(exchange, JSON.parse(cached)); + app.log.info({ exchange }, 'Ticker24h warm-started from Redis'); + } + } catch (err) { + app.log.debug({ exchange, err }, 'No Ticker24h cache in Redis (will populate on first ZMQ update)'); + } + } } catch (error) { app.log.warn({ error }, 'ZMQ Relay connection failed - historical data will not be available'); } @@ -490,6 +527,7 @@ try { ohlcService: () => ohlcService, symbolIndexService: () => symbolIndexService, workspaceManager: undefined, // Will be set per-session + ticker24hGetter: (exchange: string) => ticker24hCache.get(exchange), tavilyApiKey: config.tavilyApiKey, }); @@ -497,7 +535,7 @@ try { // Main agent: platform tools + user's general MCP tools toolRegistry.registerAgentTools({ agentName: 'main', - platformTools: ['SymbolLookup', 'GetChartData'], + platformTools: ['SymbolLookup', 'GetChartData', 'GetTicker24h'], mcpTools: ['PythonList', 'PythonDelete', 'BacktestStrategy', 'ListActiveStrategies'], }); @@ -548,6 +586,7 @@ try { const indexService = new SymbolIndexService({ icebergClient, logger: app.log, + getTicker24h: (exchange: string) => ticker24hCache.get(exchange), }); // Assign to module-level variable so onMetadataUpdate callback can use it diff --git a/gateway/src/services/ohlc-service.ts b/gateway/src/services/ohlc-service.ts index 5cbe0f5d..b7e5d1db 100644 --- a/gateway/src/services/ohlc-service.ts +++ b/gateway/src/services/ohlc-service.ts @@ -20,8 +20,6 @@ import type { ZMQRelayClient, BarUpdateCallback } from '../clients/zmq-relay-cli export type { BarUpdateCallback } from '../clients/zmq-relay-client.js'; import type { HistoryResult, - SymbolInfo, - SearchResult, DatafeedConfig, TradingViewBar, } from '../types/ohlc.js'; @@ -58,17 +56,21 @@ export class OHLCService { * Subscribe to realtime OHLC bar updates for a ticker+period. * ZMQ subscribe is issued on the first call for a given topic; subsequent calls * for the same topic only add the callback (no extra ZMQ events). + * + * @param openBars If true, subscribe to in-progress bar snapshots (every tick, for charts). + * If false (default), subscribe to closed bars only (for strategies/triggers). */ - subscribeToTicker(ticker: string, periodSeconds: number, callback: BarUpdateCallback): void { - this.relayClient.subscribeToTicker(ticker, periodSeconds, callback); + subscribeToTicker(ticker: string, periodSeconds: number, callback: BarUpdateCallback, openBars = false): void { + this.relayClient.subscribeToTicker(ticker, periodSeconds, callback, openBars); } /** * Unsubscribe a callback from realtime OHLC bar updates. * ZMQ unsubscribe is issued when the last callback for a topic is removed. + * @param openBars Must match the value used in the corresponding subscribeToTicker call. */ - unsubscribeFromTicker(ticker: string, periodSeconds: number, callback: BarUpdateCallback): void { - this.relayClient.unsubscribeFromTicker(ticker, periodSeconds, callback); + unsubscribeFromTicker(ticker: string, periodSeconds: number, callback: BarUpdateCallback, openBars = false): void { + this.relayClient.unsubscribeFromTicker(ticker, periodSeconds, callback, openBars); } /** @@ -223,66 +225,4 @@ export class OHLCService { supports_time: false, }; } - - /** - * Search symbols - * - * For now, stub with default symbol - */ - async searchSymbols( - query: string, - type?: string, - exchange?: string, - limit: number = 30 - ): Promise { - this.logger.debug({ query, type, exchange, limit }, 'Searching symbols'); - - // TODO: Implement central symbol registry - // For now, return default symbol if query matches - if (query.toLowerCase().includes('btc') || query.toLowerCase().includes('binance')) { - return [{ - symbol: 'BTC/USDT', - full_name: 'BTC/USDT (BINANCE)', - description: 'Bitcoin / Tether USD', - exchange: 'BINANCE', - ticker: 'BTC/USDT.BINANCE', - type: 'crypto', - }]; - } - - return []; - } - - /** - * Resolve symbol metadata - * - * For now, stub with default symbol - */ - async resolveSymbol(symbol: string): Promise { - this.logger.debug({ symbol }, 'Resolving symbol'); - - // TODO: Implement central symbol registry - // For now, return default symbol info for BTC/USDT.BINANCE - if (symbol === 'BTC/USDT.BINANCE' || symbol === 'BTC/USDT') { - return { - symbol: 'BTC/USDT', - name: 'BTC/USDT', - ticker: 'BTC/USDT.BINANCE', - description: 'Bitcoin / Tether USD', - type: 'crypto', - session: '24x7', - timezone: 'Etc/UTC', - exchange: 'BINANCE', - minmov: 1, - pricescale: 100, - has_intraday: true, - has_daily: true, - has_weekly_and_monthly: true, - supported_resolutions: DEFAULT_SUPPORTED_RESOLUTIONS, - data_status: 'streaming', - }; - } - - throw new Error(`Symbol not found: ${symbol}`); - } } diff --git a/gateway/src/services/symbol-index-service.ts b/gateway/src/services/symbol-index-service.ts index 2ccd3e48..e125e2b2 100644 --- a/gateway/src/services/symbol-index-service.ts +++ b/gateway/src/services/symbol-index-service.ts @@ -7,12 +7,14 @@ import type { FastifyBaseLogger } from 'fastify'; import type { IcebergClient } from '../clients/iceberg-client.js'; +import type { Ticker24hSnapshot } from '../clients/zmq-relay-client.js'; import type { SearchResult, SymbolInfo, SymbolMetadata } from '../types/ohlc.js'; import { DEFAULT_SUPPORTED_RESOLUTIONS } from '../types/ohlc.js'; export interface SymbolIndexServiceConfig { icebergClient: IcebergClient; logger: FastifyBaseLogger; + getTicker24h?: (exchange: string) => Ticker24hSnapshot | undefined; } /** @@ -23,6 +25,7 @@ export interface SymbolIndexServiceConfig { export class SymbolIndexService { private icebergClient: IcebergClient; private logger: FastifyBaseLogger; + private getTicker24h?: (exchange: string) => Ticker24hSnapshot | undefined; private symbols: Map = new Map(); // key: "MARKET_ID.EXCHANGE" (Nautilus format) private initialized: boolean = false; private initPromise: Promise | null = null; @@ -30,6 +33,7 @@ export class SymbolIndexService { constructor(config: SymbolIndexServiceConfig) { this.icebergClient = config.icebergClient; this.logger = config.logger; + this.getTicker24h = config.getTicker24h; } /** @@ -114,10 +118,9 @@ export class SymbolIndexService { } const queryLower = query.toLowerCase(); - const results: SearchResult[] = []; + const matched: SymbolMetadata[] = []; for (const [key, metadata] of this.symbols) { - // Match against various fields const ticker = key; const base = metadata.base_asset || ''; const quote = metadata.quote_asset || ''; @@ -131,15 +134,36 @@ export class SymbolIndexService { desc.toLowerCase().includes(queryLower) || marketId.toLowerCase().includes(queryLower) ) { - results.push(this.metadataToSearchResult(metadata)); + matched.push(metadata); + } + } - if (results.length >= limit) { - break; + // Build a volume lookup from Ticker24h cache keyed by "ticker" field (e.g. "BTC/USDT") + // within each exchange snapshot. Map: "MARKET_ID.EXCHANGE" → std_quote_volume + const volumeMap = new Map(); + if (this.getTicker24h) { + const exchanges = new Set(matched.map(m => m.exchange_id)); + for (const exchange of exchanges) { + const snapshot = this.getTicker24h(exchange); + if (snapshot) { + for (const ts of snapshot.tickers) { + const key = `${ts.ticker}.${exchange}`; + const vol = ts.std_quote_volume ?? ts.quote_volume_24h ?? 0; + volumeMap.set(key, vol); + } } } } - this.logger.debug({ query, count: results.length }, 'Symbol search completed'); + matched.sort((a, b) => { + const ka = `${a.market_id}.${a.exchange_id}`; + const kb = `${b.market_id}.${b.exchange_id}`; + return (volumeMap.get(kb) ?? 0) - (volumeMap.get(ka) ?? 0); + }); + + const results = matched.slice(0, limit).map(m => this.metadataToSearchResult(m)); + + this.logger.debug({ query, matched: matched.length, returned: results.length }, 'Symbol search completed'); return results; } diff --git a/gateway/src/tools/platform/get-ticker24h.tool.ts b/gateway/src/tools/platform/get-ticker24h.tool.ts new file mode 100644 index 00000000..8640d6ad --- /dev/null +++ b/gateway/src/tools/platform/get-ticker24h.tool.ts @@ -0,0 +1,90 @@ +import { DynamicStructuredTool } from '@langchain/core/tools'; +import { z } from 'zod'; +import type { FastifyBaseLogger } from 'fastify'; +import type { Ticker24hSnapshot } from '../../clients/zmq-relay-client.js'; + +export interface GetTicker24hToolConfig { + getTicker24h: (exchange: string) => Ticker24hSnapshot | undefined; + logger: FastifyBaseLogger; +} + +export function createGetTicker24hTool(config: GetTicker24hToolConfig): DynamicStructuredTool { + const { getTicker24h, logger } = config; + + return new DynamicStructuredTool({ + name: 'GetTicker24h', + description: `Retrieve 24h rolling market stats for all symbols on an exchange. Data is refreshed hourly. Returns symbols sorted by USD volume (std_quote_volume) descending. + +Use this to build a pre-filtered symbol universe before fetching OHLC data for a scanner — it avoids requesting per-symbol OHLC data for thousands of symbols and consuming the bar budget. + +Returned columns per symbol: ticker, exchange_id, base_asset, quote_asset, last_price, price_change_pct, quote_volume_24h, std_quote_volume (USD-normalized, null if unknown), bid_price, ask_price, open_24h, high_24h, low_24h, volume_24h, num_trades, timestamp_ms. + +Parameters: +- exchange: Exchange name (BINANCE, COINBASE, or KRAKEN) +- limit (optional): Top N by volume. If omitted, returns all symbols. +- min_std_quote_volume (optional): Minimum USD-normalized 24h volume; excludes symbols below this threshold. +- market_type (optional): "spot" or "perp" (perpetual futures). If omitted, returns all. +- base_asset_contains (optional): Filter to symbols whose base asset contains this string (case-insensitive), e.g. "BTC".`, + schema: z.object({ + exchange: z.string().describe('Exchange name: BINANCE, COINBASE, or KRAKEN'), + limit: z.number().int().optional().describe('Top N By Volume. If omitted, returns all symbols.'), + min_std_quote_volume: z.number().optional().describe('Minimum USD-normalized 24h volume. Excludes symbols below this threshold.'), + market_type: z.enum(['spot', 'perp']).optional().describe('Filter by market type: spot or perp. Omit for all.'), + base_asset_contains: z.string().optional().describe('Filter to symbols whose base asset contains this string (case-insensitive). E.g. "BTC".'), + }), + func: async ({ exchange, limit, min_std_quote_volume, market_type, base_asset_contains }) => { + const exchangeUpper = exchange.toUpperCase(); + logger.debug({ exchange: exchangeUpper, limit, min_std_quote_volume, market_type, base_asset_contains }, 'GetTicker24h called'); + + const snapshot = getTicker24h(exchangeUpper); + if (!snapshot) { + return JSON.stringify({ + exchange: exchangeUpper, + count: 0, + tickers: [], + note: 'No data available. Ticker24h is refreshed hourly — it may not be ready yet.', + }); + } + + let tickers = snapshot.tickers; + + if (market_type === 'spot') { + tickers = tickers.filter(t => !t.ticker.includes(':')); + } else if (market_type === 'perp') { + tickers = tickers.filter(t => t.ticker.includes(':')); + } + + if (base_asset_contains) { + const lower = base_asset_contains.toLowerCase(); + tickers = tickers.filter(t => t.base_asset.toLowerCase().includes(lower)); + } + + if (min_std_quote_volume !== undefined) { + tickers = tickers.filter( + t => t.std_quote_volume !== undefined && !isNaN(t.std_quote_volume) && t.std_quote_volume >= min_std_quote_volume + ); + } + + // Sort by std_quote_volume descending (null/undefined/NaN last) + tickers = [...tickers].sort((a, b) => { + const av = a.std_quote_volume; + const bv = b.std_quote_volume; + if (av === undefined || av === null || isNaN(av)) return 1; + if (bv === undefined || bv === null || isNaN(bv)) return -1; + return bv - av; + }); + + if (limit !== undefined) { + tickers = tickers.slice(0, limit); + } + + logger.info({ exchange: exchangeUpper, returned: tickers.length }, 'GetTicker24h result'); + + return JSON.stringify({ + exchange: exchangeUpper, + count: tickers.length, + tickers, + }); + }, + }); +} diff --git a/gateway/src/tools/tool-registry.ts b/gateway/src/tools/tool-registry.ts index 5cb8f323..2f5670fc 100644 --- a/gateway/src/tools/tool-registry.ts +++ b/gateway/src/tools/tool-registry.ts @@ -4,8 +4,10 @@ import type { MCPClientConnector } from '../harness/mcp-client.js'; import type { OHLCService } from '../services/ohlc-service.js'; import type { SymbolIndexService } from '../services/symbol-index-service.js'; import type { WorkspaceManager } from '../workspace/workspace-manager.js'; +import type { Ticker24hSnapshot } from '../clients/zmq-relay-client.js'; import { createSymbolLookupTool } from './platform/symbol-lookup.tool.js'; import { createGetChartDataTool } from './platform/get-chart-data.tool.js'; +import { createGetTicker24hTool } from './platform/get-ticker24h.tool.js'; import { createWebSearchTool } from './platform/web-search.tool.js'; import { createFetchPageTool } from './platform/fetch-page.tool.js'; import { createArxivSearchTool } from './platform/arxiv-search.tool.js'; @@ -34,6 +36,7 @@ export interface PlatformServices { ohlcService?: OHLCService | (() => OHLCService | undefined); symbolIndexService?: SymbolIndexService | (() => SymbolIndexService | undefined); workspaceManager?: WorkspaceManager | (() => WorkspaceManager | undefined); + ticker24hGetter?: (exchange: string) => Ticker24hSnapshot | undefined; tavilyApiKey?: string; } @@ -185,6 +188,18 @@ export class ToolRegistry { break; } + case 'GetTicker24h': { + if (this.platformServices.ticker24hGetter) { + tool = createGetTicker24hTool({ + getTicker24h: this.platformServices.ticker24hGetter, + logger: this.logger, + }); + } else { + this.logger.warn('ticker24hGetter not configured — GetTicker24h tool unavailable'); + } + break; + } + case 'WebSearch': { if (this.platformServices.tavilyApiKey) { tool = createWebSearchTool({ apiKey: this.platformServices.tavilyApiKey, logger: this.logger }); diff --git a/ingestor/src/ccxt-fetcher.js b/ingestor/src/ccxt-fetcher.js index 912e70ea..57babf7f 100644 --- a/ingestor/src/ccxt-fetcher.js +++ b/ingestor/src/ccxt-fetcher.js @@ -462,6 +462,80 @@ export class CCXTFetcher { return timeframe; } + /** + * Fetch 24h rolling ticker stats for all symbols on an exchange. + * Uses exchange.fetchTickers() — single API call, very rate-limit efficient. + * Returns an array of TickerStats-compatible objects, or throws if unsupported. + * + * @param {string} exchangeName - lowercase exchange name (e.g. "binance") + * @returns {Promise} Array of TickerStats objects + */ + async fetchAllTickers(exchangeName) { + const exchange = this.getExchange(exchangeName); + const exchangeUpper = exchangeName.toUpperCase(); + + if (!exchange.has['fetchTickers']) { + throw new Error(`Exchange ${exchangeUpper} does not support fetchTickers()`); + } + + this.logger.info({ exchange: exchangeUpper }, 'Fetching all 24h tickers'); + + let rawTickers; + try { + rawTickers = await exchange.fetchTickers(); + } catch (error) { + if (error.constructor?.name === 'RateLimitExceeded') { + const retryAfterMs = extractRetryAfterMs(exchange, error); + this.logger.warn({ exchange: exchangeUpper, retryAfterMs }, 'fetchTickers rate-limited'); + throw new ExchangeRateLimitError(exchangeName, retryAfterMs, error.message); + } + this.logger.error({ error: error.message, exchange: exchangeUpper }, 'Error fetching tickers'); + throw error; + } + + const nowNs = (BigInt(Date.now()) * 1_000_000n).toString(); + const tickers = []; + + for (const [symbol, t] of Object.entries(rawTickers)) { + if (!t || t.last == null) continue; + + // Build Nautilus-format ticker: "BASE/QUOTE.EXCHANGE" + const ticker = `${symbol}.${exchangeUpper}`; + + // Extract base/quote from the CCXT market info + const market = exchange.markets?.[symbol]; + const baseAsset = market?.base ?? symbol.split('/')[0] ?? ''; + const quoteAsset = market?.quote ?? symbol.split('/')[1] ?? ''; + + // protobufjs camelCase: only removes '_' before LETTERS, not digits. + // quote_volume_24h → quoteVolume_24h (underscore before '24' is preserved) + // open_24h → open_24h, high_24h → high_24h, etc. + const stat = { + ticker, + exchangeId: exchangeUpper, + baseAsset, + quoteAsset, + lastPrice: t.last ?? 0, + priceChangePct: t.percentage ?? 0, + 'quoteVolume_24h': t.quoteVolume ?? 0, + timestamp: nowNs, + }; + + if (t.bid != null) stat.bidPrice = t.bid; + if (t.ask != null) stat.askPrice = t.ask; + if (t.open != null) stat['open_24h'] = t.open; + if (t.high != null) stat['high_24h'] = t.high; + if (t.low != null) stat['low_24h'] = t.low; + if (t.baseVolume != null) stat['volume_24h'] = t.baseVolume; + if (t.info?.count != null) stat.numTrades = Number(t.info.count); + + tickers.push(stat); + } + + this.logger.info({ exchange: exchangeUpper, count: tickers.length }, 'Fetched all tickers'); + return tickers; + } + /** * Close all exchange connections */ diff --git a/ingestor/src/index.js b/ingestor/src/index.js index bdb86900..3375982e 100644 --- a/ingestor/src/index.js +++ b/ingestor/src/index.js @@ -57,6 +57,7 @@ function loadConfig() { kafka_brokers: config.kafka_brokers || ['localhost:9092'], kafka_ohlc_topic: config.kafka_ohlc_topic || 'market-ohlc', kafka_tick_topic: config.kafka_tick_topic || 'market-tick', + kafka_ticker_topic: config.kafka_ticker_topic || 'market-ticker', // Worker configuration poll_interval_ms: config.poll_interval_ms || 10000, @@ -316,6 +317,7 @@ class IngestorWorker { const isHistorical = !type || type === 'HISTORICAL_OHLC' || type === 0; const isRealtime = type === 'REALTIME_TICKS' || type === 1; + const isTickerSnapshot = type === 'TICKER_SNAPSHOT' || type === 2; if (isHistorical) { if (!this.pool.consumeSlot(jobId, exchange, 'HISTORICAL')) { @@ -331,6 +333,14 @@ class IngestorWorker { return; } this.handleRealtimeRequest(request); + } else if (isTickerSnapshot) { + if (!this.pool.consumeSlot(jobId, exchange, 'HISTORICAL')) { + this.zmqClient.sendReject(jobId, 'Slot capacity exceeded').catch(() => {}); + return; + } + this.handleTicker24hRequest(request).catch(err => { + this.logger.error({ jobId, requestId, error: err.message }, 'Unexpected error in ticker24h handler'); + }); } else { this.logger.warn({ jobId, type }, 'Unknown request type — rejecting'); this.zmqClient.sendReject(jobId, `Unknown request type: ${type}`).catch(() => {}); @@ -430,6 +440,44 @@ class IngestorWorker { this.realtimePoller.startSubscription(jobId, requestId, ticker, this.config.kafka_tick_topic); } + /** + * Fetch all tickers (24h stats) for an exchange and write to Kafka. + * Triggered by TICKER_SNAPSHOT request with sentinel ticker @TICKER24H.{EXCHANGE}. + */ + async handleTicker24hRequest(request) { + const { jobId, requestId, ticker, clientId } = request; + const exchangeId = exchangeOf(ticker); // e.g. "BINANCE" from "@TICKER24H.BINANCE" + const exchangeName = exchangeId.toLowerCase(); + + this.logger.info({ jobId, requestId, ticker, exchangeId }, 'Processing TICKER_SNAPSHOT request'); + + // Immediately ack to reset Flink's dispatch-time timeout clock. + await this.zmqClient.sendHeartbeat(jobId); + + try { + const tickers = await this.ccxtFetcher.fetchAllTickers(exchangeName); + + this.logger.info({ jobId, requestId, exchangeId, count: tickers.length }, 'Fetched tickers from exchange'); + + await this.kafkaProducer.writeTickerBatch(this.config.kafka_ticker_topic, exchangeId, tickers, clientId, requestId); + + this.logger.info({ jobId, requestId, exchangeId }, 'Ticker24h request complete — sending WorkComplete'); + await this.zmqClient.sendComplete(jobId, true); + + } catch (error) { + this.logger.error({ jobId, requestId, exchangeId, error: error.message }, 'Ticker24h request failed'); + + if (error instanceof ExchangeRateLimitError) { + this.pool.reportRateLimit(exchangeId, 'HISTORICAL', error.retryAfterMs); + } + + await this.zmqClient.sendComplete(jobId, false, error.message); + } + + this.pool.releaseSlot(jobId).catch(err => + this.logger.error({ jobId, error: err.message }, 'Failed to release ticker24h slot')); + } + getStatus() { return { activeRealtime: this.activeRealtime.size, diff --git a/ingestor/src/kafka-producer.js b/ingestor/src/kafka-producer.js index 23f79fa2..7fc37524 100644 --- a/ingestor/src/kafka-producer.js +++ b/ingestor/src/kafka-producer.js @@ -1,6 +1,6 @@ // Kafka producer for writing market data import { Kafka } from 'kafkajs'; -import { encodeMessage, MessageTypeId, Tick, OHLC, OHLCBatch, Market } from './proto/messages.js'; +import { encodeMessage, MessageTypeId, Tick, OHLC, OHLCBatch, Market, TickerBatch } from './proto/messages.js'; export class KafkaProducer { constructor(config, logger) { @@ -302,6 +302,39 @@ export class KafkaProducer { ); } + /** + * Write a TickerBatch (all-ticker snapshot for one exchange) to Kafka. + * @param {string} topic - Kafka topic name (e.g. "market-ticker") + * @param {string} exchangeId - Exchange identifier (e.g. "BINANCE") + * @param {Array} tickers - Array of TickerStats-compatible objects + * @param {string} [clientId] - Non-empty = client-initiated; absent = scheduled broadcast + * @param {string} [requestId] - Echoed for tracing + */ + async writeTickerBatch(topic, exchangeId, tickers, clientId, requestId) { + if (!this.isConnected) { + throw new Error('Kafka producer not connected'); + } + + const nowNs = (BigInt(Date.now()) * 1_000_000n).toString(); + const batch = { + exchangeId, + tickers, + fetchedAt: nowNs, + }; + if (clientId) batch.clientId = clientId; + if (requestId) batch.requestId = requestId; + + const [frame1, frame2] = encodeMessage(MessageTypeId.TICKER_BATCH, batch, TickerBatch); + const value = Buffer.concat([frame1, frame2]); + + await this.producer.send({ + topic, + messages: [{ key: exchangeId, value }], + }); + + this.logger.info({ exchange: exchangeId, count: tickers.length, topic }, 'Wrote TickerBatch to Kafka'); + } + /** * Disconnect from Kafka */ diff --git a/protobuf/ingestor.proto b/protobuf/ingestor.proto index 40f4dfb0..73581540 100644 --- a/protobuf/ingestor.proto +++ b/protobuf/ingestor.proto @@ -30,6 +30,7 @@ message DataRequest { enum RequestType { HISTORICAL_OHLC = 0; REALTIME_TICKS = 1; + TICKER_SNAPSHOT = 2; } } diff --git a/protobuf/ticker24h.proto b/protobuf/ticker24h.proto new file mode 100644 index 00000000..c780ddc7 --- /dev/null +++ b/protobuf/ticker24h.proto @@ -0,0 +1,61 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "com.dexorder.proto"; + +// Exchange rate for a single quote currency → USD conversion +message QuoteCurrencyRate { + string currency = 1; // "BTC", "ETH", "USDT", "USDC" + double usd_rate = 2; // conversion rate to USD (1.0 for USD stablecoins) + string source_ticker = 3; // "BTC/USDT.BINANCE" — source market used for rate + uint64 timestamp = 4; // nanoseconds when rate was observed +} + +// Index of quote currency → USD rates used to compute std_quote_volume. +// Embedded in Ticker24h so consumers have full audit trail. +message QuoteCurrencyIndex { + repeated QuoteCurrencyRate rates = 1; + uint64 generated_at = 2; // nanoseconds +} + +// 24-hour rolling market stats for a single symbol +message TickerStats { + // Required: always present for any valid ticker + string ticker = 1; // "BTC/USDT.BINANCE" + string exchange_id = 2; // "BINANCE" + string base_asset = 3; // "BTC" + string quote_asset = 4; // "USDT" + double last_price = 5; // last traded price in quote currency + double price_change_pct = 6; // 24h price change as percentage (e.g. 2.5 = +2.5%) + double quote_volume_24h = 7; // raw 24h volume in quote asset + uint64 timestamp = 8; // nanoseconds, snapshot time + + // Optional: not all exchanges / markets provide these + optional double bid_price = 9; + optional double ask_price = 10; + optional double open_24h = 11; + optional double high_24h = 12; + optional double low_24h = 13; + optional double volume_24h = 14; // base-asset volume (0 is valid on tiny markets) + optional double std_quote_volume = 15; // quote_volume_24h normalized to USD; null if conversion unknown + optional uint32 num_trades = 16; // 24h trade count (0 is valid on tiny markets) +} + +// Full Ticker24h snapshot for one exchange: all symbols + currency index +// Published via ZMQ XPUB topic: "{exchange_id}|ticker24h" (scheduled) +// or "RESPONSE:{client_id}" (client-initiated) +message Ticker24h { + string exchange_id = 1; + repeated TickerStats tickers = 2; + uint64 generated_at = 3; // nanoseconds + QuoteCurrencyIndex currency_index = 4; +} + +// Kafka message written by ingestor after fetchTickers() call (topic: market-ticker) +message TickerBatch { + string exchange_id = 1; + repeated TickerStats tickers = 2; + uint64 fetched_at = 3; // nanoseconds + optional string client_id = 4; // non-empty = client-initiated; absent = scheduled broadcast + optional string request_id = 5; // echoed for tracing +} diff --git a/sandbox/RESEARCH_API_USAGE.md b/sandbox/RESEARCH_API_USAGE.md index 4f6fee2c..2f97006b 100644 --- a/sandbox/RESEARCH_API_USAGE.md +++ b/sandbox/RESEARCH_API_USAGE.md @@ -77,6 +77,54 @@ print(df.head()) - `"ticker"` - Market identifier - `"period_seconds"` - Period in seconds +## Building a Scanner Universe with get_ticker_24h + +**Always pre-filter symbols before fetching OHLC for multiple tickers.** The per-script bar budget is 2M bars; fetching all ~1800 Binance symbols would exhaust it immediately. Use `get_ticker_24h` to get a ranked, filtered list of symbols with no OHLC budget cost, then run per-symbol analysis only on the filtered set. + +```python +from dexorder.api import get_api +import asyncio + +api = get_api() + +# Get top 50 most liquid Binance spot symbols by USD volume (no bar budget used) +universe = asyncio.run(api.data.get_ticker_24h( + "BINANCE", + limit=50, + market_type="spot", + min_std_quote_volume=10_000_000 # filter to $10M+ daily USD volume +)) +print(f"Universe: {len(universe)} symbols") + +# Now fetch OHLC only for the filtered set +for ticker in universe["ticker"]: + df = asyncio.run(api.data.historical_ohlc( + ticker=ticker, + period_seconds=86400, # daily bars for a scanner + start_time="2024-01-01", + end_time="2025-01-01", + extra_columns=["volume"] + )) + print(f"[Data] {ticker}: {len(df)} bars") +``` + +### Filter parameters + +```python +# All BTC pairs on Binance +df = asyncio.run(api.data.get_ticker_24h("BINANCE", base_asset_contains="BTC")) + +# Top 100 perp markets with $50M+ daily volume +df = asyncio.run(api.data.get_ticker_24h( + "BINANCE", limit=100, market_type="perp", min_std_quote_volume=50_000_000 +)) + +# All symbols on Coinbase +df = asyncio.run(api.data.get_ticker_24h("COINBASE")) +``` + +Returned DataFrame is sorted by `std_quote_volume` (USD-normalized) descending. Symbols without a USD rate have `std_quote_volume = NaN` and appear last. Full column list: `ticker`, `exchange_id`, `base_asset`, `quote_asset`, `last_price`, `price_change_pct`, `quote_volume_24h`, `std_quote_volume`, `bid_price`, `ask_price`, `open_24h`, `high_24h`, `low_24h`, `volume_24h`, `num_trades`, `timestamp_ms`. + ## Using the Charting API The charting API provides styled financial charts with OHLC candlesticks and technical indicators. diff --git a/sandbox/dexorder/api/data_api.py b/sandbox/dexorder/api/data_api.py index f6940dcf..11ba12b8 100644 --- a/sandbox/dexorder/api/data_api.py +++ b/sandbox/dexorder/api/data_api.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Optional, List +from typing import Optional, List, Any import pandas as pd @@ -160,3 +160,59 @@ class DataAPI(ABC): """ pass + @abstractmethod + async def get_ticker_24h( + self, + exchange: str, + limit: Optional[int] = None, + min_std_quote_volume: Optional[float] = None, + market_type: Optional[str] = None, + base_asset_contains: Optional[str] = None, + ) -> pd.DataFrame: + """ + Retrieve the 24h rolling market stats for all symbols on an exchange. + + Data is refreshed hourly by the ingestor pipeline. Returns all symbols + sorted by std_quote_volume (USD-normalized volume) descending. Symbols + with unknown quote currency conversion are listed last (std_quote_volume = NaN). + + Args: + exchange: Exchange name (e.g., "BINANCE", "COINBASE", "KRAKEN") + limit: If set, return only the Top N symbols By Volume. None = return all. + min_std_quote_volume: Exclude symbols with USD volume below this threshold. + market_type: Filter by market type: "spot" or "perp". None = return all. + base_asset_contains: Filter to symbols whose base asset contains this string + (case-insensitive). E.g., "BTC" matches "BTC/USDT". + + Returns: + DataFrame sorted by std_quote_volume descending (NULLs last). Columns: + - ticker: Full ticker (e.g., "BTC/USDT.BINANCE") + - exchange_id: Exchange name + - base_asset: Base currency (e.g., "BTC") + - quote_asset: Quote currency (e.g., "USDT") + - last_price: Last traded price in quote currency + - price_change_pct: 24h price change as percentage + - quote_volume_24h: Raw 24h volume in quote asset + - std_quote_volume: quote_volume_24h converted to USD (NaN if conversion unknown) + - bid_price, ask_price: Current best bid/ask (NaN if not provided) + - open_24h, high_24h, low_24h: 24h OHLC prices (NaN if not provided) + - volume_24h: Base-asset volume (NaN if not provided) + - num_trades: 24h trade count (NaN if not provided) + - timestamp_ms: Snapshot timestamp in milliseconds + + Returns empty DataFrame if no data is available (e.g., not yet fetched). + + Examples: + # Top 50 most liquid Binance spot symbols + df = await api.get_ticker_24h("BINANCE", limit=50, market_type="spot") + + # All BTC pairs with at least $10M daily volume + df = await api.get_ticker_24h("BINANCE", + base_asset_contains="BTC", + min_std_quote_volume=10_000_000) + + # All Binance symbols (for building a scanner universe) + df = await api.get_ticker_24h("BINANCE") + """ + pass + diff --git a/sandbox/dexorder/impl/data_api_impl.py b/sandbox/dexorder/impl/data_api_impl.py index 73a83ebe..bb88b14f 100644 --- a/sandbox/dexorder/impl/data_api_impl.py +++ b/sandbox/dexorder/impl/data_api_impl.py @@ -8,6 +8,7 @@ import pandas as pd from dexorder.api.data_api import DataAPI from dexorder.ohlc_client import OHLCClient +from dexorder.ticker24h_client import Ticker24hClient from dexorder.utils import TimestampInput, to_nanoseconds log = logging.getLogger(__name__) @@ -33,6 +34,9 @@ VALID_EXTRA_COLUMNS = { } +MAX_BARS_PER_SCRIPT = 2_000_000 + + class DataAPIImpl(DataAPI): """ Implementation of DataAPI using OHLCClient for querying OHLC data. @@ -79,8 +83,10 @@ class DataAPIImpl(DataAPI): s3_secret_key=s3_secret_key, s3_region=s3_region, ) + self.ticker24h_client = Ticker24hClient(relay_endpoint, notification_endpoint) self.request_timeout = request_timeout self._started = False + self._bars_fetched: int = 0 async def start(self): """ @@ -91,6 +97,7 @@ class DataAPIImpl(DataAPI): """ if not self._started: await self.ohlc_client.start() + await self.ticker24h_client.connect() self._started = True async def stop(self): @@ -99,6 +106,7 @@ class DataAPIImpl(DataAPI): """ if self._started: await self.ohlc_client.stop() + await self.ticker24h_client.stop() self._started = False async def historical_ohlc( @@ -121,6 +129,17 @@ class DataAPIImpl(DataAPI): start_nanos = to_nanoseconds(start_time) end_nanos = to_nanoseconds(end_time) + estimated_bars = int((end_nanos - start_nanos) / (period_seconds * 1_000_000_000)) + if self._bars_fetched + estimated_bars > MAX_BARS_PER_SCRIPT: + raise ValueError( + f"Script bar budget exceeded: {self._bars_fetched:,} bars already fetched, " + f"this request would add ~{estimated_bars:,} more " + f"(~{self._bars_fetched + estimated_bars:,} total). " + f"Limit is {MAX_BARS_PER_SCRIPT:,} bars per script. " + f"Reduce the date range, use a coarser period (e.g. 86400s instead of 3600s), " + f"or fetch fewer symbols." + ) + log.debug(f"Fetching OHLC: {ticker}, period={period_seconds}s, " f"start={start_time} ({start_nanos}ns), end={end_time} ({end_nanos}ns)") @@ -154,6 +173,7 @@ class DataAPIImpl(DataAPI): if not df.empty: available_cols = [col for col in columns_to_fetch if col in df.columns] df = df[available_cols] + self._bars_fetched += len(df) return df @@ -173,6 +193,30 @@ class DataAPIImpl(DataAPI): """ raise NotImplementedError("latest_ohlc will be implemented in the future") + async def get_ticker_24h( + self, + exchange: str, + limit: Optional[int] = None, + min_std_quote_volume: Optional[float] = None, + market_type: Optional[str] = None, + base_asset_contains: Optional[str] = None, + ) -> pd.DataFrame: + """ + Retrieve the 24h rolling market stats for all symbols on an exchange. + + See DataAPI.get_ticker_24h for full documentation. + """ + if not self._started: + await self.start() + return await self.ticker24h_client.get_ticker_24h( + exchange=exchange, + limit=limit, + min_std_quote_volume=min_std_quote_volume, + market_type=market_type, + base_asset_contains=base_asset_contains, + request_timeout=self.request_timeout, + ) + async def __aenter__(self): """Support async context manager.""" await self.start() diff --git a/sandbox/dexorder/ticker24h_client.py b/sandbox/dexorder/ticker24h_client.py new file mode 100644 index 00000000..ccbc27b8 --- /dev/null +++ b/sandbox/dexorder/ticker24h_client.py @@ -0,0 +1,242 @@ +""" +ZMQ-based client for fetching Ticker24h snapshots via the relay backbone. + +Sends a SubmitHistoricalRequest with ticker="@TICKER24H.{EXCHANGE}" to the relay. +Flink processes the TICKER_SNAPSHOT job and publishes the result on +"RESPONSE:{client_id}" — a topic only this client subscribes to, preventing +the DoS vector of research scripts triggering gateway-broadcast updates. + +Results are cached in-process. Subsequent calls for the same exchange return +instantly from cache. +""" + +import asyncio +import logging +import struct +import uuid +from typing import Optional, Dict + +import pandas as pd +import zmq +import zmq.asyncio + +try: + from dexorder.generated.ingestor_pb2 import SubmitHistoricalRequest, SubmitResponse + from dexorder.generated.ticker24h_pb2 import Ticker24h +except ImportError: + print("Warning: Protobuf files not found. Run protoc inside the sandbox container.") + raise + +log = logging.getLogger(__name__) + +MSG_TYPE_SUBMIT = 0x10 +MSG_TYPE_TICKER_24H = 0x0D +PROTOCOL_VERSION = 0x01 + +TICKER24H_COLUMNS = [ + "ticker", "exchange_id", "base_asset", "quote_asset", + "last_price", "price_change_pct", "quote_volume_24h", "std_quote_volume", + "bid_price", "ask_price", "open_24h", "high_24h", "low_24h", + "volume_24h", "num_trades", "timestamp_ms", +] + + +class Ticker24hClient: + """ + Client that fetches Ticker24h snapshots via the relay/Flink pipeline. + + Call connect() before use. Each exchange result is cached; re-requests + are only issued when the cache is empty (first call per exchange per session). + The background listener continues running and will update the cache on any + future broadcast subscriptions if the subscription list is expanded. + """ + + def __init__(self, relay_endpoint: str, notification_endpoint: str): + self._relay_endpoint = relay_endpoint + self._notification_endpoint = notification_endpoint + self._client_id = f"t24h-{uuid.uuid4().hex[:8]}" + self._cache: Dict[str, Ticker24h] = {} + self._events: Dict[str, asyncio.Event] = {} + self._context: Optional[zmq.asyncio.Context] = None + self._sub: Optional[zmq.asyncio.Socket] = None + self._listener_task: Optional[asyncio.Task] = None + self._connected = False + + async def connect(self): + """Start the background listener. Safe to call multiple times.""" + if self._connected: + return + + # Clean up stale state from a previous event loop + if self._listener_task is not None and not self._listener_task.done(): + self._listener_task.cancel() + try: + await self._listener_task + except asyncio.CancelledError: + pass + if self._context is not None: + self._context.term() + + self._context = zmq.asyncio.Context() + self._sub = self._context.socket(zmq.SUB) + self._sub.connect(self._notification_endpoint) + self._sub.subscribe(f"RESPONSE:{self._client_id}".encode()) + + self._listener_task = asyncio.create_task(self._listen()) + # Let the listener establish its subscription before any request goes out + await asyncio.sleep(0.1) + self._connected = True + log.debug("Ticker24hClient connected: client_id=%s", self._client_id) + + async def stop(self): + """Stop the background listener and close sockets.""" + self._connected = False + if self._listener_task and not self._listener_task.done(): + self._listener_task.cancel() + try: + await self._listener_task + except asyncio.CancelledError: + pass + if self._sub: + self._sub.close() + self._sub = None + if self._context: + self._context.term() + self._context = None + + async def _listen(self): + while True: + try: + frames = await self._sub.recv_multipart() + # Wire: [topic][0x01][0x0D + Ticker24h proto bytes] + if len(frames) < 3: + continue + payload = frames[2] + if not payload or payload[0] != MSG_TYPE_TICKER_24H: + continue + snapshot = Ticker24h() + snapshot.ParseFromString(payload[1:]) + exchange = snapshot.exchange_id + self._cache[exchange] = snapshot + event = self._events.get(exchange) + if event: + event.set() + log.debug("Ticker24h received: exchange=%s tickers=%d", exchange, len(snapshot.tickers)) + except asyncio.CancelledError: + return + except Exception as exc: + log.warning("Ticker24hClient listener error: %s", exc) + + async def _request(self, exchange: str, timeout: float): + """Send a TICKER_SNAPSHOT request and wait for the notification.""" + event = self._events.setdefault(exchange, asyncio.Event()) + event.clear() + + request_id = str(uuid.uuid4()) + req = SubmitHistoricalRequest( + request_id=request_id, + ticker=f"@TICKER24H.{exchange}", + client_id=self._client_id, + ) + + version_frame = struct.pack("B", PROTOCOL_VERSION) + message_frame = struct.pack("B", MSG_TYPE_SUBMIT) + req.SerializeToString() + + # Use a fresh REQ socket per request (matches HistoryClient pattern) + sock = self._context.socket(zmq.REQ) + sock.connect(self._relay_endpoint) + try: + await sock.send(version_frame, zmq.SNDMORE) + await sock.send(message_frame) + # Drain the relay's immediate SubmitResponse ack + while True: + await asyncio.wait_for(sock.recv(), timeout=5.0) + if not sock.get(zmq.RCVMORE): + break + except asyncio.TimeoutError: + log.warning("No ack from relay for ticker24h request exchange=%s", exchange) + finally: + sock.close() + + try: + await asyncio.wait_for(event.wait(), timeout=timeout) + except asyncio.TimeoutError: + raise TimeoutError( + f"Ticker24h request for {exchange} timed out after {timeout}s" + ) + + async def get_ticker_24h( + self, + exchange: str, + limit: Optional[int] = None, + min_std_quote_volume: Optional[float] = None, + market_type: Optional[str] = None, + base_asset_contains: Optional[str] = None, + request_timeout: float = 30.0, + ) -> pd.DataFrame: + """ + Return a DataFrame of 24h stats for all symbols on the exchange. + + First call per exchange triggers a relay request and waits for Flink's + response (up to request_timeout seconds). Subsequent calls return from cache. + + Args: + exchange: Exchange name, e.g. "BINANCE" + limit: Return only top N by std_quote_volume + min_std_quote_volume: Exclude symbols below this USD volume threshold + market_type: "spot" (no ':' in ticker) or "perp" (has ':') + base_asset_contains: Case-insensitive substring filter on base asset + request_timeout: Seconds to wait for first response (default 30) + """ + if self._connected and self._listener_task is not None and self._listener_task.done(): + self._connected = False + + if not self._connected: + await self.connect() + + exchange = exchange.upper() + if exchange not in self._cache: + await self._request(exchange, timeout=request_timeout) + + snapshot = self._cache[exchange] + rows = [] + for ts in snapshot.tickers: + rows.append({ + "ticker": ts.ticker, + "exchange_id": ts.exchange_id, + "base_asset": ts.base_asset, + "quote_asset": ts.quote_asset, + "last_price": ts.last_price, + "price_change_pct": ts.price_change_pct, + "quote_volume_24h": ts.quote_volume_24h, + "std_quote_volume": ts.std_quote_volume if ts.HasField("std_quote_volume") else None, + "bid_price": ts.bid_price if ts.HasField("bid_price") else None, + "ask_price": ts.ask_price if ts.HasField("ask_price") else None, + "open_24h": ts.open_24h if ts.HasField("open_24h") else None, + "high_24h": ts.high_24h if ts.HasField("high_24h") else None, + "low_24h": ts.low_24h if ts.HasField("low_24h") else None, + "volume_24h": ts.volume_24h if ts.HasField("volume_24h") else None, + "num_trades": ts.num_trades if ts.HasField("num_trades") else None, + "timestamp_ms": round(ts.timestamp / 1_000_000), + }) + + df = pd.DataFrame(rows, columns=TICKER24H_COLUMNS) if rows else pd.DataFrame(columns=TICKER24H_COLUMNS) + + if market_type: + mt = market_type.lower() + if mt == "spot": + df = df[~df["ticker"].str.contains(":", na=False)] + elif mt in ("perp", "perpetual"): + df = df[df["ticker"].str.contains(":", na=False)] + + if base_asset_contains: + df = df[df["base_asset"].str.contains(base_asset_contains, case=False, na=False)] + + if min_std_quote_volume is not None: + df = df[df["std_quote_volume"].notna() & (df["std_quote_volume"] >= min_std_quote_volume)] + + df = df.sort_values("std_quote_volume", ascending=False, na_position="last") + if limit: + df = df.head(limit) + + return df.reset_index(drop=True) diff --git a/sandbox/dexorder/tools/evaluate_indicator.py b/sandbox/dexorder/tools/evaluate_indicator.py index 78305255..c81b330a 100644 --- a/sandbox/dexorder/tools/evaluate_indicator.py +++ b/sandbox/dexorder/tools/evaluate_indicator.py @@ -8,6 +8,7 @@ Returns a JSON object with a `values` array of {timestamp, ...} records, where timestamp is a Unix second integer and value fields hold floats (or null for NaN). """ +import asyncio import json import logging from pathlib import Path @@ -212,9 +213,16 @@ async def evaluate_indicator( }))] args.append(df[col]) - # Compute + # Compute — run in a thread so a slow indicator doesn't block the event loop try: - result = fn(*args, **parameters) + result = await asyncio.wait_for( + asyncio.to_thread(fn, *args, **parameters), + timeout=25.0, + ) + except asyncio.TimeoutError: + return [TextContent(type="text", text=json.dumps({ + "error": f"Indicator computation timed out after 25 seconds: {pandas_ta_name}" + }))] except Exception as exc: log.exception("evaluate_indicator: computation failed") return [TextContent(type="text", text=json.dumps({ diff --git a/sandbox/dexorder/tools/python_tools.py b/sandbox/dexorder/tools/python_tools.py index e04b6d0d..be5aa36c 100644 --- a/sandbox/dexorder/tools/python_tools.py +++ b/sandbox/dexorder/tools/python_tools.py @@ -380,6 +380,12 @@ class GitManager: # Custom Indicator Setup # ============================================================================= +# Maps ta_name → git commit hash of implementation.py at last registration. +# Used to detect when an indicator file has been updated so the binding can +# be refreshed without requiring a process restart. +_custom_indicator_revisions: dict[str, str] = {} + + def setup_custom_indicators(data_dir: Path) -> None: """ Register user's custom indicators with pandas-ta. @@ -388,7 +394,8 @@ def setup_custom_indicators(data_dir: Path) -> None: the function as ``ta.custom_{sanitized_name}`` so that evaluate_indicator can call it as ``getattr(ta, "custom_trendflex", None)``. - The binding is idempotent — indicators already registered are skipped. + Re-registers automatically when the implementation file's git revision + changes, so in-process edits are picked up without a restart. Note: pandas-ta's ta.import_dir() requires a category-based directory structure (e.g. tmpdir/momentum/trendflex.py) plus a companion @@ -434,9 +441,29 @@ def setup_custom_indicators(data_dir: Path) -> None: continue seen.add(ta_name) - # Skip if already bound (e.g. called multiple times in a process) + # Get the current git revision of this implementation file so we can + # detect edits and re-register without a process restart. + git_rev = "" + try: + result = subprocess.run( + ["git", "log", "-1", "--format=%H", "--", impl.name], + capture_output=True, text=True, cwd=impl.parent, + ) + git_rev = result.stdout.strip() + except Exception: + pass + if getattr(ta, ta_name, None) is not None: - continue + if _custom_indicator_revisions.get(ta_name) == git_rev: + continue # Same revision — already up to date + # Revision changed — clear old binding so we re-register below + log.info("Re-registering updated custom indicator '%s' (rev %s)", ta_name, git_rev[:8]) + try: + delattr(ta, ta_name) + except AttributeError: + pass + + _custom_indicator_revisions[ta_name] = git_rev try: spec = importlib.util.spec_from_file_location(ta_name, impl) diff --git a/sandbox/main.py b/sandbox/main.py index e2f0b7a0..452a076a 100644 --- a/sandbox/main.py +++ b/sandbox/main.py @@ -1472,6 +1472,7 @@ class UserContainer: self.data_api: Optional[DataAPIImpl] = None self.event_bridge: Optional[StrategyEventBridge] = None self.running = False + self._uvicorn_server: Optional["uvicorn.Server"] = None async def start(self) -> None: """Start all subsystems""" @@ -1642,6 +1643,7 @@ class UserContainer: access_log=True, ) server = uvicorn.Server(config) + self._uvicorn_server = server await server.serve() else: raise ValueError(f"Unknown MCP transport: {self.config.mcp_transport}") @@ -1670,9 +1672,11 @@ async def main(): loop = asyncio.get_event_loop() def handle_signal(sig): - logging.info(f"Received signal {sig}, shutting down...") - asyncio.create_task(container.stop()) - loop.stop() + logging.info(f"Received signal {sig}, initiating graceful shutdown...") + if container._uvicorn_server is not None: + container._uvicorn_server.should_exit = True + else: + loop.stop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, lambda s=sig: handle_signal(s)) diff --git a/web/src/components/CategoryItemList.vue b/web/src/components/CategoryItemList.vue index 5bd3a8b6..d1530322 100644 --- a/web/src/components/CategoryItemList.vue +++ b/web/src/components/CategoryItemList.vue @@ -2,6 +2,8 @@ import { ref } from 'vue' import DetailsEditDialog from './DetailsEditDialog.vue' import ResearchViewDialog from './ResearchViewDialog.vue' +import { useIndicatorStore } from '../stores/indicators' +import { useIndicatorTypesStore } from '../stores/indicatorTypes' const props = defineProps<{ category: 'indicator' | 'strategy' | 'research' @@ -14,6 +16,9 @@ const editingName = ref('') const viewDialogVisible = ref(false) const viewingName = ref('') +const indicatorStore = useIndicatorStore() +const indicatorTypesStore = useIndicatorTypesStore() + function openEdit(name: string) { editingName.value = name dialogVisible.value = true @@ -24,8 +29,28 @@ function openView(name: string) { viewDialogVisible.value = true } +function addToChart(pandasTaName: string, displayName: string) { + const type = indicatorTypesStore.types[pandasTaName] + if (!type) return + const defaultParams: Record = {} + for (const [k, p] of Object.entries(type.metadata.parameters)) { + defaultParams[k] = p.default + } + const now = Math.floor(Date.now() / 1000) + indicatorStore.addIndicator({ + id: `${pandasTaName}_${Date.now()}`, + pandas_ta_name: pandasTaName, + instance_name: displayName, + parameters: defaultParams, + visible: true, + pane: type.metadata.pane, + custom_metadata: type.metadata, + created_at: now, + modified_at: now, + }) +} + function onUpdated(_payload: { category: string; name: string; success: boolean; error?: string }) { - // Hook for handling the details_updated response — add logic here as needed } @@ -35,8 +60,9 @@ function onUpdated(_payload: { category: string; name: string; success: boolean;
{{ row.display_name }} {{ row.description ?? '' }} - - + + +
@@ -138,4 +164,21 @@ function onUpdated(_payload: { category: string; name: string; success: boolean; border-color: #089981; color: #089981; } + +.use-btn { + flex-shrink: 0; + background: none; + border: 1px solid #3d3d3d; + color: #888; + cursor: pointer; + font-size: 11px; + padding: 2px 8px; + border-radius: 3px; + line-height: 18px; +} + +.use-btn:hover { + border-color: #4a9eca; + color: #4a9eca; +} diff --git a/web/src/components/DetailsEditDialog.vue b/web/src/components/DetailsEditDialog.vue index 416419ef..1e7246a4 100644 --- a/web/src/components/DetailsEditDialog.vue +++ b/web/src/components/DetailsEditDialog.vue @@ -1,5 +1,5 @@