diff --git a/bin/init b/bin/init index 959e60f8..33c13cb8 100755 --- a/bin/init +++ b/bin/init @@ -204,32 +204,17 @@ if [ -z "$USER_ID" ]; then fi echo -e "${GREEN}User ID: $USER_ID${NC}" -# Build license JSON based on type -case "$LICENSE_TYPE" in - enterprise) - LICENSE_JSON='{"licenseType":"enterprise","features":{"maxIndicators":200,"maxStrategies":100,"maxBacktestDays":1825,"realtimeData":true,"customExecutors":true,"apiAccess":true},"resourceLimits":{"maxConcurrentSessions":20,"maxMessagesPerDay":10000,"maxTokensPerMessage":32768,"rateLimitPerMinute":300},"k8sResources":{"memoryRequest":"1Gi","memoryLimit":"4Gi","cpuRequest":"500m","cpuLimit":"4000m","storage":"50Gi","tmpSizeLimit":"1Gi","enableIdleShutdown":true,"idleTimeoutMinutes":120},"preferredModel":{"provider":"anthropic","model":"claude-opus-4-6","temperature":0.7}}' - ;; - free) - LICENSE_JSON='{"licenseType":"free","features":{"maxIndicators":10,"maxStrategies":3,"maxBacktestDays":30,"realtimeData":false,"customExecutors":false,"apiAccess":false},"resourceLimits":{"maxConcurrentSessions":1,"maxMessagesPerDay":100,"maxTokensPerMessage":4096,"rateLimitPerMinute":20},"k8sResources":{"memoryRequest":"256Mi","memoryLimit":"512Mi","cpuRequest":"100m","cpuLimit":"500m","storage":"2Gi","tmpSizeLimit":"128Mi","enableIdleShutdown":true,"idleTimeoutMinutes":30},"preferredModel":{"provider":"anthropic","model":"claude-haiku-4-5-20251001","temperature":0.7}}' - ;; - pro|*) - LICENSE_JSON='{"licenseType":"pro","features":{"maxIndicators":50,"maxStrategies":20,"maxBacktestDays":365,"realtimeData":true,"customExecutors":true,"apiAccess":true},"resourceLimits":{"maxConcurrentSessions":5,"maxMessagesPerDay":1000,"maxTokensPerMessage":8192,"rateLimitPerMinute":60},"k8sResources":{"memoryRequest":"512Mi","memoryLimit":"2Gi","cpuRequest":"250m","cpuLimit":"2000m","storage":"10Gi","tmpSizeLimit":"256Mi","enableIdleShutdown":true,"idleTimeoutMinutes":60},"preferredModel":{"provider":"anthropic","model":"claude-sonnet-4-6","temperature":0.7}}' - ;; -esac - -echo -e "${GREEN}→${NC} Creating $LICENSE_TYPE license..." -$KUBECTL exec "$PG_POD" -- psql -U postgres -d iceberg -c " - INSERT INTO user_licenses (user_id, email, license, mcp_server_url) - VALUES ( - '$USER_ID', - '$USER_EMAIL', - '$LICENSE_JSON', - '$MCP_URL' - ) - ON CONFLICT (user_id) DO UPDATE SET - license = EXCLUDED.license, - updated_at = NOW(); -" > /dev/null +echo -e "${GREEN}→${NC} Setting $LICENSE_TYPE license..." +HTTP_CODE=$(curl -s -o /tmp/dexorder-set-tier-response.json -w "%{http_code}" \ + -X POST "$BASE_URL/api/admin/users/$USER_ID/set-tier" \ + -H "Content-Type: application/json" \ + -d "{\"tier\": \"$LICENSE_TYPE\"}") +if [[ "$HTTP_CODE" != "200" ]]; then + echo -e "${RED}✗ Failed to set license tier (HTTP $HTTP_CODE)${NC}" + cat /tmp/dexorder-set-tier-response.json 2>/dev/null + exit 1 +fi +rm -f /tmp/dexorder-set-tier-response.json echo -e "${GREEN}✓ User ready: $USER_EMAIL ($LICENSE_TYPE)${NC}" echo "" diff --git a/deploy/k8s/base/gateway-rbac.yaml b/deploy/k8s/base/gateway-rbac.yaml index f31b49f8..8bd13dcc 100644 --- a/deploy/k8s/base/gateway-rbac.yaml +++ b/deploy/k8s/base/gateway-rbac.yaml @@ -1,6 +1,6 @@ -# RBAC for gateway to CREATE sandbox deployments only -# Principle of least privilege: gateway can ONLY create deployments/services/PVCs -# in the sandbox namespace. Deletion is handled by the lifecycle sidecar. +# RBAC for gateway to manage sandbox deployments +# Principle of least privilege: gateway can create/delete deployments in the +# sandbox namespace. PVC deletion is still handled by the lifecycle sidecar. # No pods, secrets, exec, or cross-namespace access. --- apiVersion: v1 @@ -15,10 +15,10 @@ metadata: name: sandbox-creator namespace: sandbox rules: - # Deployments: create and read only (deletion handled by sidecar) + # Deployments: full management (delete used for license tier changes; PVC deletion still via sidecar) - apiGroups: ["apps"] resources: ["deployments"] - verbs: ["create", "get", "list", "watch", "patch", "update"] + verbs: ["create", "get", "list", "watch", "patch", "update", "delete"] # PVCs: create and read (deletion handled by sidecar) - apiGroups: [""] @@ -41,7 +41,6 @@ rules: verbs: ["get"] # Explicitly NOT included: - # - deployments/delete - handled by lifecycle sidecar # - pvc/delete - handled by lifecycle sidecar # - services/delete - handled by lifecycle sidecar # - pods (create/delete) - must go through deployments diff --git a/deploy/k8s/base/gateway.yaml b/deploy/k8s/base/gateway.yaml index 92e1ed44..068a0c21 100644 --- a/deploy/k8s/base/gateway.yaml +++ b/deploy/k8s/base/gateway.yaml @@ -83,10 +83,10 @@ spec: resources: requests: - memory: "256Mi" + memory: "512Mi" cpu: "100m" limits: - memory: "512Mi" + memory: "2Gi" cpu: "500m" livenessProbe: diff --git a/deploy/k8s/base/sandbox-quotas.yaml b/deploy/k8s/base/sandbox-quotas.yaml index d91c4267..cb5c94df 100644 --- a/deploy/k8s/base/sandbox-quotas.yaml +++ b/deploy/k8s/base/sandbox-quotas.yaml @@ -19,8 +19,8 @@ spec: cpu: "100m" # Maximum any single container can request max: - memory: "2Gi" - cpu: "2000m" + memory: "8Gi" + cpu: "4000m" min: memory: "32Mi" cpu: "10m" diff --git a/deploy/k8s/dev/configs/ingestor-config.yaml b/deploy/k8s/dev/configs/ingestor-config.yaml index 6d4bfd9c..6dbd1400 100644 --- a/deploy/k8s/dev/configs/ingestor-config.yaml +++ b/deploy/k8s/dev/configs/ingestor-config.yaml @@ -4,18 +4,32 @@ flink_hostname: flink-jobmanager ingestor_broker_port: 5567 -# Supported exchanges (subscribe to these prefixes) +# Supported exchanges (used for symbol metadata generation) supported_exchanges: - BINANCE - COINBASE - KRAKEN +# Per-exchange work slot capacity. +# Each slot is one concurrent job. historical_slots limits parallel OHLC fetches; +# realtime_slots limits concurrent tick subscriptions. Set based on exchange rate +# limits and connection constraints — these are conservative starting values. +exchange_capacity: + BINANCE: + historical_slots: 1 + realtime_slots: 5 + COINBASE: + historical_slots: 1 + realtime_slots: 4 + KRAKEN: + historical_slots: 1 + realtime_slots: 3 + # Kafka configuration kafka_brokers: - kafka:9092 # Worker configuration -max_concurrent: 10 poll_interval_ms: 10000 # Logging diff --git a/deploy/k8s/dev/sandbox-config.yaml b/deploy/k8s/dev/sandbox-config.yaml index 9fc71f63..137a1ae6 100644 --- a/deploy/k8s/dev/sandbox-config.yaml +++ b/deploy/k8s/dev/sandbox-config.yaml @@ -46,6 +46,11 @@ data: alerts: max_active: 100 + # Memory guard: soft RLIMIT_AS limit as a fraction of the cgroup memory.max. + # Set below 1.0 so Python raises MemoryError before the kernel OOM-kills the pod. + memory: + limit_fraction: 0.85 + # Logging logging: level: "INFO" diff --git a/doc/plan.md b/doc/plan.md new file mode 100644 index 00000000..3102409a --- /dev/null +++ b/doc/plan.md @@ -0,0 +1,10 @@ +# Development Plan + +* Realtime data +* Triggers +* Strategy UI +* Backtesting TV integration +* Paper Trading +* User secrets +* Live Execution +* Sandbox <=> Dexorder auth diff --git a/doc/prod_deployment.md b/doc/prod_deployment.md new file mode 100644 index 00000000..29f9d9d7 --- /dev/null +++ b/doc/prod_deployment.md @@ -0,0 +1,139 @@ +# Production Deployment Guide + +This document describes the full process for deploying the AI platform to the production Kubernetes cluster, including the special steps required when the Iceberg schema has changed. + +## Overview + +The production cluster runs under `kubectl --context prod`, defaulting to the `ai` namespace. The `sandbox` namespace is shared between dev and prod. + +Deployment consists of two parts: + +1. **Standard deploy** — rebuild and push all images, apply k8s manifests, roll out services +2. **Iceberg schema wipe** *(when schema has changed)* — clear both the Iceberg REST catalog (postgres) and the MinIO data warehouse before deploying + +--- + +## Standard Deployment (no schema changes) + +```bash +bin/deploy-all --sandboxes +``` + +This script (hardcoded to `--context=prod`) performs: + +1. Applies base kustomize manifests (`deploy/k8s/prod/`) — namespaces, RBAC, policies +2. Applies `deploy/k8s/prod/infrastructure.yaml` — statefulsets, deployments +3. Runs `bin/config-update prod` — updates ConfigMaps +4. Builds and pushes images for all 7 services: `gateway`, `web`, `sandbox`, `lifecycle-sidecar`, `flink`, `relay`, `ingestor` +5. *(with `--sandboxes`)* Deletes sandbox Deployments and Services in the `sandbox` namespace (PVCs are retained; gateway recreates them on next login) +6. Waits for rollouts on all 6 main deployments + +> **Secrets are NOT updated by this script.** Run `bin/secret-update prod` separately if secrets have changed. + +--- + +## Full Deploy with Iceberg Schema Wipe + +Use this when the Iceberg table schema has changed (e.g. protobuf/column changes in the `trading.ohlc` table). + +### Architecture note + +The Iceberg REST catalog uses **two storage layers** that must both be cleared: + +| Layer | What it stores | How to clear | +|---|---|---| +| PostgreSQL `iceberg` database | Table/namespace metadata (catalog) | Drop and recreate the database | +| MinIO `warehouse` bucket | Parquet data files | `mc rm --recursive --force` | + +**Important:** The gateway also uses the `iceberg` postgres database for its own auth tables (`user`, `user_licenses`, `session`, etc.). Wiping the database removes all user accounts. After the wipe, the schema must be re-applied and users recreated. + +### Step-by-step + +#### 1. Scale down Iceberg consumers + +```bash +kubectl --context prod -n ai scale deployment iceberg-catalog flink-jobmanager flink-taskmanager --replicas=0 +``` + +This prevents in-flight writes during the wipe. + +#### 2. Wipe the Iceberg PostgreSQL catalog + +```bash +kubectl --context prod -n ai exec postgres-0 -- psql -U postgres -c "DROP DATABASE iceberg;" +kubectl --context prod -n ai exec postgres-0 -- psql -U postgres -c "CREATE DATABASE iceberg;" +``` + +#### 3. Wipe the MinIO warehouse bucket + +Get MinIO credentials from the cluster secret: + +```bash +kubectl --context prod -n ai get secret minio-secret -o jsonpath='{.data.root-user}' | base64 -d +kubectl --context prod -n ai get secret minio-secret -o jsonpath='{.data.root-password}' | base64 -d +``` + +Configure the `mc` client inside the MinIO pod and remove all objects: + +```bash +kubectl --context prod -n ai exec minio-0 -- mc alias set local http://localhost:9000 +kubectl --context prod -n ai exec minio-0 -- mc rm --recursive --force local/warehouse/ +``` + +#### 4. Run the full deploy + +```bash +bin/deploy-all --sandboxes +``` + +This rebuilds and redeploys all services, including `iceberg-catalog`, `flink-jobmanager`, and `flink-taskmanager` (which were scaled to zero above — `deploy-all` will restore them to their manifest replica counts). + +#### 5. Re-apply the gateway database schema + +The gateway does **not** auto-migrate. After the `iceberg` database is recreated, the schema must be applied manually: + +```bash +kubectl --context prod -n ai exec -i postgres-0 -- psql -U postgres -d iceberg < gateway/schema.sql +``` + +This creates the `user`, `session`, `user_licenses`, and related tables. + +#### 6. Recreate all users + +```bash +bin/create-all-users prod +``` + +This registers all alpha test users via the gateway API and assigns their licenses. Users are defined in the script itself (`bin/create-all-users`). + +To add or modify users, edit that file or run `bin/create-user prod` interactively. + +--- + +## Verification + +```bash +curl -I https://dexorder.ai/api/health +``` + +Check gateway logs for errors: + +```bash +kubectl --context prod -n ai logs deployment/gateway --tail=100 +``` + +--- + +## Common Issues + +### Login fails after Iceberg wipe + +**Symptom:** `Sign in failed` (401) or `User creation failed` (postgres error `42P01: undefined table`) + +**Cause:** Dropping the `iceberg` database removes the gateway's auth tables along with the Iceberg catalog metadata — they share the same database. + +**Fix:** Re-apply the schema and recreate users (steps 5 and 6 above). + +### Gateway shows `42P01` errors but pod is running + +The gateway does not auto-migrate on startup. The schema file must be applied manually after any database recreation. A gateway restart alone will not fix this. diff --git a/doc/protocol.md b/doc/protocol.md index 68d173c9..f43aa399 100644 --- a/doc/protocol.md +++ b/doc/protocol.md @@ -81,18 +81,29 @@ All sockets bind on **Relay** (well-known endpoint). Components connect to relay - Relay publishes DataRequest to ingestor work queue - No request tracking - relay is stateless -### 2. Ingestor Work Queue (Relay → Ingestors) -**Pattern**: PUB/SUB with exchange prefix filtering -- **Socket Type**: Relay uses PUB (bind), Ingestors use SUB (connect) -- **Endpoint**: `tcp://*:5555` (Relay binds) -- **Message Types**: `DataRequest` (historical or realtime) -- **Topic Prefix**: Market name (e.g., `BTC/USDT.`, `ETH/BTC.`) -- **Behavior**: - - Relay publishes work with exchange prefix from ticker - - Ingestors subscribe only to exchanges they support - - Multiple ingestors can compete for same exchange - - Ingestors write data to Kafka only (no direct response) - - Flink processes Kafka → Iceberg → notification +### 2. Ingestor Work Queue (Flink ↔ Ingestors) +**Pattern**: ROUTER/DEALER slot-based broker +- **Socket Type**: Flink `IngestorBroker` uses ROUTER (bind), Ingestors use DEALER (connect) +- **Endpoint**: `tcp://*:5567` (Flink binds) +- **Message Types**: `WorkerReady` (slot offer), `DataRequest` (work assignment), `WorkComplete`, `WorkHeartbeat`, `WorkReject`, `WorkStop` +- **Capacity model**: + - Each `WorkerReady` (0x20) is ONE slot offer for one exchange and one job type (`SlotType`: `HISTORICAL=1`, `REALTIME=2`, `ANY=0`) + - Ingestors send N `WorkerReady` messages at startup — one per available slot per exchange per type + - Flink dispatches a job by matching the slot's exchange and SlotType to the request + - The slot is consumed on dispatch; the ingestor re-offers it (new `WorkerReady`) when the job ends + - Rate-limit backoff: if the exchange returns a 429, the ingestor delays the re-offer by the `Retry-After` duration from the response header +- **Historical job lifecycle**: + - Flink dispatches `DataRequest` (HISTORICAL_OHLC) → ingestor fetches and writes to Kafka → sends `WorkComplete` (0x21) → sends new `WorkerReady` for that slot +- **Realtime job lifecycle**: + - Flink dispatches `DataRequest` (REALTIME_TICKS) → ingestor polls exchange and writes ticks to Kafka → sends `WorkHeartbeat` (0x22) every 5 s → on `WorkStop` (0x25) from Flink: cancels and sends new `WorkerReady` +- **Slot configuration** (per ingestor, per exchange): + ```yaml + exchange_capacity: + BINANCE: { historical_slots: 3, realtime_slots: 5 } + KRAKEN: { historical_slots: 2, realtime_slots: 3 } + COINBASE: { historical_slots: 2, realtime_slots: 4 } + ``` +- **Flink restart**: when Flink restarts its `freeSlots` deque is cleared; all in-flight jobs time out on the ingestor side, releasing their slots, which then re-offer via `WorkerReady` ### 3. Market Data Fanout (Relay ↔ Flink ↔ Clients) **Pattern**: XPUB/XSUB proxy diff --git a/doc/test_prompt.md b/doc/test_prompt.md index 87689530..2d38df10 100644 --- a/doc/test_prompt.md +++ b/doc/test_prompt.md @@ -1,4 +1 @@ -what conclusions can you make by analyzing historical data on ETH price direction changes near market session overlaps and market sessions changes on monday and tuesday? - ---- - +what conclusions can you make by analyzing historical data on ETH price direction changes near market session overlaps and market sessions changes on monday and tuesday? \ No newline at end of file diff --git a/flink/src/main/java/com/dexorder/flink/ingestor/IngestorBroker.java b/flink/src/main/java/com/dexorder/flink/ingestor/IngestorBroker.java index 3074beef..980a4094 100644 --- a/flink/src/main/java/com/dexorder/flink/ingestor/IngestorBroker.java +++ b/flink/src/main/java/com/dexorder/flink/ingestor/IngestorBroker.java @@ -3,6 +3,7 @@ package com.dexorder.flink.ingestor; import com.dexorder.flink.zmq.ZmqChannelManager; import com.dexorder.proto.DataRequest; import com.dexorder.proto.RealtimeParams; +import com.dexorder.proto.SlotType; import com.dexorder.proto.SubmitHistoricalRequest; import com.dexorder.proto.WorkComplete; import com.dexorder.proto.WorkHeartbeat; @@ -17,27 +18,27 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Deque; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** - * LRU-style work broker for ingestors. + * Slot-based work broker for ingestors. * - * Ingestors connect via DEALER to the ROUTER socket on port 5567. They register with READY, - * are dispatched WORK messages, and respond with COMPLETE (historical) or HEARTBEAT (realtime). - * If a heartbeat times out the job is re-queued and dispatched to another available worker. + * Each WorkerReady message from an ingestor represents ONE available slot for a + * specific exchange and job type (HISTORICAL or REALTIME). Flink consumes the slot + * by dispatching a DataRequest to it. The ingestor re-offers the slot (sends another + * WorkerReady) once the job completes, subject to any rate-limit backoff. * - * Also receives SubmitHistoricalRequest messages forwarded by the relay on the PULL socket (5566). + * Also receives SubmitHistoricalRequest messages forwarded by the relay on the PULL + * socket (5566), and realtime job requests from RealtimeSubscriptionManager. * - * Message type IDs (ZMQ framing, not Kafka): - * 0x10 SubmitHistoricalRequest (relay → Flink via PULL, same as client wire type) - * 0x20 WorkerReady (ingestor → Flink) + * Message type IDs (ZMQ framing): + * 0x10 SubmitHistoricalRequest (relay → Flink via PULL) + * 0x20 WorkerReady (ingestor → Flink: one slot offer) * 0x21 WorkComplete (ingestor → Flink) * 0x22 WorkHeartbeat (ingestor → Flink) * 0x23 WorkReject (ingestor → Flink) @@ -53,7 +54,7 @@ public class IngestorBroker implements AutoCloseable { private static final byte MSG_TYPE_WORK_COMPLETE = 0x21; private static final byte MSG_TYPE_WORK_HEARTBEAT = 0x22; private static final byte MSG_TYPE_WORK_REJECT = 0x23; - private static final byte MSG_TYPE_WORK_ASSIGN = 0x01; // DataRequest type on wire + private static final byte MSG_TYPE_WORK_ASSIGN = 0x01; private static final byte MSG_TYPE_WORK_STOP = 0x25; /** Re-queue realtime job if no heartbeat received within this window (ms) */ @@ -65,20 +66,20 @@ public class IngestorBroker implements AutoCloseable { private volatile boolean running; private Thread brokerThread; - // ── Worker tracking ────────────────────────────────────────────────────── + // ── Slot tracking ───────────────────────────────────────────────────────── - /** Workers ready to accept a job, in LRU order (head = least recently used) */ - private final Deque freeWorkers = new ArrayDeque<>(); + /** + * Available slots, in LRU order (head = least recently used). + * Each entry is one WorkerReady slot offer from an ingestor. + */ + private final Deque freeSlots = new ArrayDeque<>(); - /** Jobs waiting for a compatible free worker */ + /** Jobs waiting for a compatible free slot */ private final Queue pendingJobs = new ArrayDeque<>(); - /** Jobs currently executing on a worker */ + /** Jobs currently executing on a slot */ private final Map activeJobs = new ConcurrentHashMap<>(); - /** Worker identity → supported exchanges (set once on READY) */ - private final Map knownWorkers = new ConcurrentHashMap<>(); - // ── Thread-safe inbound queue from RealtimeSubscriptionManager ─────────── private final Queue externalSubmissions = new ConcurrentLinkedQueue<>(); @@ -134,8 +135,7 @@ public class IngestorBroker implements AutoCloseable { /** * Stop all realtime jobs for a ticker (called when last subscriber leaves). - * Thread-safe — posts a stop marker via externalSubmissions is complex; instead we - * directly find and stop active jobs. Protected by ConcurrentHashMap. + * Thread-safe via ConcurrentHashMap. */ public void stopRealtimeJobsForTicker(String ticker) { List toStop = new ArrayList<>(); @@ -154,7 +154,7 @@ public class IngestorBroker implements AutoCloseable { } } - // ── Broker loop ────────────────────────────────────────────────────────── + // ── Broker loop ─────────────────────────────────────────────────────────── private void brokerLoop() { ZMQ.Socket pullSocket = zmqManager.getSocket(ZmqChannelManager.Channel.CLIENT_REQUEST); @@ -174,18 +174,15 @@ public class IngestorBroker implements AutoCloseable { enqueueJob(ext); } - // Poll sockets (100ms timeout) poller.poll(100); if (poller.pollin(0)) { handleClientRequest(pullSocket); } - if (poller.pollin(1)) { handleWorkerMessage(routerSocket); } - // Check for heartbeat / completion timeouts checkTimeouts(); } catch (Exception e) { @@ -235,7 +232,8 @@ public class IngestorBroker implements AutoCloseable { .setClientId(req.hasClientId() ? req.getClientId() : "") .build(); enqueueJob(dataRequest); - LOG.info("Received historical request from relay: request_id={}, ticker={}", req.getRequestId(), req.getTicker()); + LOG.info("Received historical request from relay: request_id={}, ticker={}", + req.getRequestId(), req.getTicker()); } catch (Exception e) { LOG.error("Failed to parse SubmitHistoricalRequest from relay", e); } @@ -277,23 +275,28 @@ public class IngestorBroker implements AutoCloseable { } } + /** + * A WorkerReady message represents ONE slot offer for one exchange and job type. + * Add it directly to freeSlots — no deduplication (multiple slots per ingestor are expected). + */ private void handleWorkerReady(byte[] identity, String identityKey, byte[] payload) throws Exception { WorkerReady ready = WorkerReady.parseFrom(payload); - Set exchanges = new HashSet<>(ready.getExchangesList()); + SlotType slotType = ready.getJobType(); - WorkerInfo worker = knownWorkers.computeIfAbsent(identityKey, - k -> new WorkerInfo(identity, identityKey, exchanges)); - worker.exchanges = exchanges; // update in case re-READY with different config - worker.identity = identity; - - if (!freeWorkers.contains(worker)) { - freeWorkers.addLast(worker); + for (String exchange : ready.getExchangesList()) { + WorkerSlot slot = new WorkerSlot(identity, identityKey, exchange.toUpperCase(), slotType); + freeSlots.addLast(slot); + LOG.info("Worker slot READY: id={}, exchange={}, type={}, totalFreeSlots={}", + identityKey, exchange, slotType, freeSlots.size()); } - LOG.info("Ingestor READY: id={}, exchanges={}, freeWorkers={}", identityKey, exchanges, freeWorkers.size()); dispatchPending(); } + /** + * Historical job completed. Remove from activeJobs. + * The ingestor will send a new typed WorkerReady to re-offer the slot. + */ private void handleWorkComplete(String identityKey, byte[] payload) throws Exception { WorkComplete complete = WorkComplete.parseFrom(payload); String jobId = complete.getJobId(); @@ -304,13 +307,7 @@ public class IngestorBroker implements AutoCloseable { } else { LOG.info("Job COMPLETE: jobId={}, ticker={}, success={}", jobId, job.ticker, complete.getSuccess()); } - - // Worker is free again - WorkerInfo worker = knownWorkers.get(identityKey); - if (worker != null) { - freeWorkers.addLast(worker); - dispatchPending(); - } + // Slot re-registration is driven by the ingestor via a new WorkerReady. } private void handleWorkHeartbeat(String identityKey, byte[] payload) throws Exception { @@ -325,6 +322,10 @@ public class IngestorBroker implements AutoCloseable { } } + /** + * Ingestor rejected the job. Re-queue it with a new ID. + * The ingestor will send a new typed WorkerReady when it's ready again. + */ private void handleWorkReject(String identityKey, byte[] payload) throws Exception { WorkReject reject = WorkReject.parseFrom(payload); String jobId = reject.getJobId(); @@ -332,31 +333,23 @@ public class IngestorBroker implements AutoCloseable { ActiveJob job = activeJobs.remove(jobId); if (job != null) { - // Re-queue with fresh job_id so a different ingestor may pick it up DataRequest requeued = job.request.toBuilder() .setJobId(UUID.randomUUID().toString()) .build(); pendingJobs.add(requeued); } - - // Worker is still free (it rejected, not crashed) - WorkerInfo worker = knownWorkers.get(identityKey); - if (worker != null) { - freeWorkers.addLast(worker); - dispatchPending(); - } + // Slot re-registration is driven by the ingestor via a new WorkerReady. } - // ── Dispatch ───────────────────────────────────────────────────────────── + // ── Dispatch ────────────────────────────────────────────────────────────── private void enqueueJob(DataRequest request) { - // Check if we can immediately dispatch - WorkerInfo worker = findFreeWorker(exchangeOf(request.getTicker())); - if (worker != null) { - dispatch(worker, request); + WorkerSlot slot = findFreeSlot(exchangeOf(request.getTicker()), request.getType()); + if (slot != null) { + dispatch(slot, request); } else { pendingJobs.add(request); - LOG.debug("No free worker for {}, queued (pendingJobs={})", request.getTicker(), pendingJobs.size()); + LOG.debug("No free slot for {}, queued (pendingJobs={})", request.getTicker(), pendingJobs.size()); } } @@ -364,9 +357,9 @@ public class IngestorBroker implements AutoCloseable { Queue remaining = new ArrayDeque<>(); DataRequest job; while ((job = pendingJobs.poll()) != null) { - WorkerInfo worker = findFreeWorker(exchangeOf(job.getTicker())); - if (worker != null) { - dispatch(worker, job); + WorkerSlot slot = findFreeSlot(exchangeOf(job.getTicker()), job.getType()); + if (slot != null) { + dispatch(slot, job); } else { remaining.add(job); } @@ -374,28 +367,30 @@ public class IngestorBroker implements AutoCloseable { pendingJobs.addAll(remaining); } - private void dispatch(WorkerInfo worker, DataRequest request) { - freeWorkers.remove(worker); - + private void dispatch(WorkerSlot slot, DataRequest request) { try { byte[] protoBytes = request.toByteArray(); - boolean sent = zmqManager.sendToWorker(worker.identity, PROTOCOL_VERSION, MSG_TYPE_WORK_ASSIGN, protoBytes); + boolean sent = zmqManager.sendToWorker(slot.identity, PROTOCOL_VERSION, MSG_TYPE_WORK_ASSIGN, protoBytes); if (!sent) { - LOG.error("Failed to dispatch job to worker={}, re-queuing", worker.identityKey); - freeWorkers.addLast(worker); + // ROUTER_MANDATORY: identity is disconnected — purge all stale slots for this + // worker and re-queue the job so dispatchPending() can try a live slot. + int purged = purgeWorkerSlots(slot.identityKey); + LOG.warn("Worker {} unreachable, purged {} stale free slots, re-queuing job={}", + slot.identityKey, purged, request.getJobId()); pendingJobs.add(request); return; } - ActiveJob active = new ActiveJob(worker.identity, worker.identityKey, + ActiveJob active = new ActiveJob(slot.identity, slot.identityKey, request, request.getTicker(), request.getType()); activeJobs.put(request.getJobId(), active); - LOG.info("Dispatched job: jobId={}, ticker={}, type={}, worker={}", - request.getJobId(), request.getTicker(), request.getType(), worker.identityKey); + LOG.info("Dispatched job: jobId={}, ticker={}, type={}, worker={}, slotType={}", + request.getJobId(), request.getTicker(), request.getType(), + slot.identityKey, slot.slotType); } catch (Exception e) { LOG.error("Error dispatching job", e); - freeWorkers.addLast(worker); + freeSlots.addLast(slot); } } @@ -408,7 +403,7 @@ public class IngestorBroker implements AutoCloseable { } } - // ── Timeout checking ───────────────────────────────────────────────────── + // ── Timeout checking ────────────────────────────────────────────────────── private void checkTimeouts() { long now = System.currentTimeMillis(); @@ -426,10 +421,9 @@ public class IngestorBroker implements AutoCloseable { for (String jobId : timedOut) { ActiveJob job = activeJobs.remove(jobId); if (job == null) continue; - LOG.warn("Job timed out (no heartbeat/completion): jobId={}, ticker={}, type={}, worker={}", + LOG.warn("Job timed out: jobId={}, ticker={}, type={}, worker={}", jobId, job.ticker, job.type, job.workerIdentityKey); - // Re-queue with a new job_id DataRequest requeued = job.request.toBuilder() .setJobId(UUID.randomUUID().toString()) .build(); @@ -438,7 +432,7 @@ public class IngestorBroker implements AutoCloseable { } } - // ── Helpers ────────────────────────────────────────────────────────────── + // ── Helpers ─────────────────────────────────────────────────────────────── /** Extract exchange name from ticker, e.g. "BTC/USDT.BINANCE" → "BINANCE" */ private static String exchangeOf(String ticker) { @@ -446,12 +440,32 @@ public class IngestorBroker implements AutoCloseable { return dot >= 0 ? ticker.substring(dot + 1).toUpperCase() : ""; } - /** Find and remove a free worker that supports the given exchange. */ - private WorkerInfo findFreeWorker(String exchange) { - for (WorkerInfo w : freeWorkers) { - if (exchange.isEmpty() || w.exchanges.contains(exchange)) { - freeWorkers.remove(w); - return w; + /** + * Remove all free slots offered by a given worker identity. + * Called when a dispatch to that identity fails (ROUTER_MANDATORY unreachable). + * Returns the number of slots removed. + */ + private int purgeWorkerSlots(String identityKey) { + int before = freeSlots.size(); + freeSlots.removeIf(slot -> slot.identityKey.equals(identityKey)); + return before - freeSlots.size(); + } + + /** + * Find and remove a free slot that supports the given exchange and request type. + * A slot with SlotType.ANY matches any request type. + */ + private WorkerSlot findFreeSlot(String exchange, DataRequest.RequestType requestType) { + for (WorkerSlot slot : freeSlots) { + boolean exchangeMatch = exchange.isEmpty() || slot.exchange.equals(exchange); + boolean typeMatch = slot.slotType == SlotType.ANY + || (slot.slotType == SlotType.HISTORICAL + && requestType == DataRequest.RequestType.HISTORICAL_OHLC) + || (slot.slotType == SlotType.REALTIME + && requestType == DataRequest.RequestType.REALTIME_TICKS); + if (exchangeMatch && typeMatch) { + freeSlots.remove(slot); + return slot; } } return null; @@ -468,17 +482,20 @@ public class IngestorBroker implements AutoCloseable { stop(); } - // ── Inner types ────────────────────────────────────────────────────────── + // ── Inner types ─────────────────────────────────────────────────────────── - private static class WorkerInfo { - byte[] identity; + /** One available work slot offered by an ingestor via WorkerReady. */ + private static class WorkerSlot { + final byte[] identity; final String identityKey; - Set exchanges; + final String exchange; + final SlotType slotType; - WorkerInfo(byte[] identity, String identityKey, Set exchanges) { + WorkerSlot(byte[] identity, String identityKey, String exchange, SlotType slotType) { this.identity = identity; this.identityKey = identityKey; - this.exchanges = exchanges; + this.exchange = exchange; + this.slotType = slotType; } } diff --git a/flink/src/main/java/com/dexorder/flink/zmq/ZmqChannelManager.java b/flink/src/main/java/com/dexorder/flink/zmq/ZmqChannelManager.java index 63dcccdb..a1e879e3 100644 --- a/flink/src/main/java/com/dexorder/flink/zmq/ZmqChannelManager.java +++ b/flink/src/main/java/com/dexorder/flink/zmq/ZmqChannelManager.java @@ -87,6 +87,11 @@ public class ZmqChannelManager implements Closeable { socket.setLinger(1000); socket.setSndHWM(10000); socket.setRcvHWM(10000); + if (socketType == SocketType.ROUTER) { + // Return false (EHOSTUNREACH) instead of silently dropping messages to + // unknown/disconnected peer identities. Enables immediate stale-slot detection. + socket.setRouterMandatory(true); + } socket.bind(endpoint); sockets.put(channel.name(), socket); LOG.info("Bound {} to {}", description, endpoint); diff --git a/gateway/src/channels/websocket-handler.ts b/gateway/src/channels/websocket-handler.ts index 195825dd..8d4c449f 100644 --- a/gateway/src/channels/websocket-handler.ts +++ b/gateway/src/channels/websocket-handler.ts @@ -595,28 +595,28 @@ export class WebSocketHandler { case 'get_bars': { if (!ohlcService) { socket.send(JSON.stringify({ - type: 'error', + type: 'get_bars_response', request_id: requestId, - error_message: 'OHLC service not available' + error: 'OHLC service not available', })); break; } - const history = await ohlcService.fetchOHLC( - payload.symbol, - payload.period_seconds, - payload.from_time, - payload.to_time, - payload.countback - ); - logger.info({ requestId, barCount: history.bars?.length ?? 0, noData: history.noData, socketState: socket.readyState }, 'Sending get_bars_response'); - socket.send( - jsonStringifySafe({ - type: 'get_bars_response', - request_id: requestId, - history, - }) - ); - logger.info({ requestId }, 'get_bars_response sent'); + try { + const history = await ohlcService.fetchOHLC( + payload.symbol, + payload.period_seconds, + payload.from_time, + payload.to_time, + payload.countback + ); + logger.info({ requestId, barCount: history.bars?.length ?? 0, noData: history.noData, socketState: socket.readyState }, 'Sending get_bars_response'); + socket.send(jsonStringifySafe({ type: 'get_bars_response', request_id: requestId, history })); + logger.info({ requestId }, 'get_bars_response sent'); + } catch (err: any) { + const errorMessage = err?.message ?? String(err); + logger.error({ requestId, ticker: payload.symbol, errorMessage }, 'get_bars failed'); + socket.send(JSON.stringify({ type: 'get_bars_response', request_id: requestId, error: errorMessage })); + } break; } diff --git a/gateway/src/db/user-service.ts b/gateway/src/db/user-service.ts index 13001a5f..750026d5 100644 --- a/gateway/src/db/user-service.ts +++ b/gateway/src/db/user-service.ts @@ -1,6 +1,6 @@ import { Pool } from 'pg'; -import type { UserLicense } from '../types/user.js'; -import { UserLicenseSchema } from '../types/user.js'; +import type { UserLicense, License, LicenseTier } from '../types/user.js'; +import { UserLicenseSchema, LICENSE_TIER_TEMPLATES } from '../types/user.js'; import type { AuthService } from '../auth/auth-service.js'; export class UserService { @@ -114,6 +114,54 @@ export class UserService { return await this.authService.verifyToken(token); } + /** + * Re-apply the current canonical template for every user's declared licenseType. + * Updates only the DB — does not touch deployments, so running pods are unaffected + * until their next natural restart. + */ + async migrateAllLicenses(): Promise<{ updated: number }> { + const client = await this.pool.connect(); + try { + const rows = await client.query( + `SELECT user_id, license->>'licenseType' AS tier FROM user_licenses` + ); + let updated = 0; + for (const row of rows.rows) { + const tier = row.tier as LicenseTier; + if (!LICENSE_TIER_TEMPLATES[tier]) continue; + await client.query( + `UPDATE user_licenses SET license = $1::jsonb, updated_at = NOW() WHERE user_id = $2`, + [JSON.stringify(LICENSE_TIER_TEMPLATES[tier]), row.user_id] + ); + updated++; + } + return { updated }; + } finally { + client.release(); + } + } + + /** + * Set a user's license to a canonical tier template. + * Overwrites the existing license with the current template for that tier. + */ + async setUserLicenseTier(userId: string, tier: LicenseTier): Promise { + const license = LICENSE_TIER_TEMPLATES[tier]; + const client = await this.pool.connect(); + try { + await client.query( + `INSERT INTO user_licenses (user_id, license, mcp_server_url, updated_at) + VALUES ($1, $2::jsonb, 'pending', NOW()) + ON CONFLICT (user_id) DO UPDATE + SET license = EXCLUDED.license, updated_at = NOW()`, + [userId, JSON.stringify(license)] + ); + } finally { + client.release(); + } + return license; + } + /** * Close database pool */ diff --git a/gateway/src/harness/agent-harness.ts b/gateway/src/harness/agent-harness.ts index bad6ea7f..4178a58d 100644 --- a/gateway/src/harness/agent-harness.ts +++ b/gateway/src/harness/agent-harness.ts @@ -16,6 +16,7 @@ import type { ResearchSubagent } from './subagents/research/index.js'; import type { IndicatorSubagent } from './subagents/indicator/index.js'; import type { WebExploreSubagent } from './subagents/web-explore/index.js'; import type { StrategySubagent } from './subagents/strategy/index.js'; +import { BaseSubagent } from './subagents/base-subagent.js'; import type { DynamicStructuredTool } from '@langchain/core/tools'; import { getToolRegistry } from '../tools/tool-registry.js'; import type { MCPToolInfo } from '../tools/mcp/mcp-tool-wrapper.js'; @@ -237,12 +238,22 @@ export class AgentHarness { try { const { createResearchSubagent } = await import('./subagents/research/index.js'); - // Create a model for the research subagent + // Path resolution: use the compiled output path + const researchSubagentPath = join(__dirname, 'subagents', 'research'); + this.config.logger.debug({ researchSubagentPath }, 'Using research subagent path'); + + // Load the subagent config to get maxTokens — research scripts require more tokens + // than the provider default (4096) because python_write arguments include full code bodies + const researchSubagentConfig = await BaseSubagent.loadConfig(researchSubagentPath); + + // Create a model for the research subagent — always use the complex model + // since research tasks involve data analysis, charting, and code generation const { model } = await this.modelRouter.route( - 'research analysis', // dummy query + 'analyze and backtest research data', // triggers complex routing this.config.license, RoutingStrategy.COMPLEXITY, - this.config.userId + this.config.userId, + researchSubagentConfig.maxTokens // honour the subagent's maxTokens (e.g. 8192) ); // Get tools for research subagent from registry @@ -274,10 +285,6 @@ export class AgentHarness { })); } - // Path resolution: use the compiled output path - const researchSubagentPath = join(__dirname, 'subagents', 'research'); - this.config.logger.debug({ researchSubagentPath }, 'Using research subagent path'); - this.researchSubagent = await createResearchSubagent( model, this.config.logger, @@ -535,10 +542,12 @@ export class AgentHarness { const stream = await model.stream(messagesCopy, { signal }); for await (const chunk of stream) { if (typeof chunk.content === 'string' && chunk.content.length > 0) { + this.config.logger.trace({ content: chunk.content }, 'raw chunk'); yield { type: 'chunk', content: chunk.content }; } else if (Array.isArray(chunk.content)) { for (const block of chunk.content) { if (block.type === 'text' && block.text) { + this.config.logger.trace({ content: block.text }, 'raw chunk'); yield { type: 'chunk', content: block.text }; } } diff --git a/gateway/src/harness/prompts/system-prompt.md b/gateway/src/harness/prompts/system-prompt.md index 81070f6d..dbe14a9f 100644 --- a/gateway/src/harness/prompts/system-prompt.md +++ b/gateway/src/harness/prompts/system-prompt.md @@ -18,8 +18,11 @@ Dexorder trading platform provides OHLC data at a 1-minute resolution and suppor Dexorder does not support: * tick-by-tick trading or high-frequency strategies. -* long-running computations like paramater optimizations or training machine learning models. +* long-running computations like parameter optimizations or training machine learning models during live execution. * portfolio optimization or trading strategies that require a large number of symbols. +* LLM calls inside strategy scripts — strategies must be deterministic and lightweight for backtesting to be reliable and repeatable. LLMs are slow, expensive, and introduce temperature-based non-determinism that breaks backtesting. (Walk-forward LLM integration via timer/data triggers is planned but not yet available.) +* TradFi data (equities, forex, bonds, options, etc.) — only crypto pricing data is available. +* Alternative data sources such as news feeds, Twitter/social sentiment, on-chain data, or economic calendars — these are not yet available. Dexorder does support: * backtesting strategies against historical data. @@ -33,6 +36,27 @@ If the user asks for a capability not provided by Dexorder, decline and explain # Important Instructions +## Switching Chart Symbol or Timeframe + +**IMPORTANT: When the user asks to switch, change, or update the chart symbol or timeframe, you MUST call `workspace_patch` directly. Do NOT use web_explore, do NOT delegate to the indicator tool.** + +Call `workspace_patch` with `store_name = "chartState"` and the appropriate JSON patch: + +To switch symbol only: +```json +[{ "op": "replace", "path": "/symbol", "value": "SOL/USDT.BINANCE" }] +``` + +To switch symbol and period (period is seconds: 60=1m, 300=5m, 900=15m, 3600=1h, 86400=1D): +```json +[ + { "op": "replace", "path": "/symbol", "value": "SOL/USDT.BINANCE" }, + { "op": "replace", "path": "/period", "value": 900 } +] +``` + +You already know this format — do not search for it. After patching, confirm the change to the user. + ## Investment Advice **NEVER** recommend any specific ticker, trade, or position. You may suggest mechanical adjustments or improvements to strategies, but you must **NEVER** offer an opinion on a specific trade or position. You are **NOT** a registered investment advisor. diff --git a/gateway/src/harness/prompts/welcome.md b/gateway/src/harness/prompts/welcome.md index 673d1c84..55ce31cf 100644 --- a/gateway/src/harness/prompts/welcome.md +++ b/gateway/src/harness/prompts/welcome.md @@ -1 +1 @@ -This is your first chat with a new user. Welcome them to Dexorder and describe who are you and what can you do. +This is your first chat with a new user. Welcome them to Dexorder, and describe who you are and what can you do. \ No newline at end of file diff --git a/gateway/src/harness/subagents/strategy/system-prompt.md b/gateway/src/harness/subagents/strategy/system-prompt.md index 1f754740..5b3a2000 100644 --- a/gateway/src/harness/subagents/strategy/system-prompt.md +++ b/gateway/src/harness/subagents/strategy/system-prompt.md @@ -83,6 +83,15 @@ self.config.initial_capital # starting capital in quote currency | `sell_vol` | float | Sell-side volume (taker sells) | | `open_interest` | float | Open interest (futures only; NaN for spot) | +### Available data — crypto only + +Strategies have access **only** to crypto OHLC feeds with volume, buy/sell volume split, and open interest. The following are **not available** and must never be referenced in a strategy: + +- **TradFi data** — equities, forex, bonds, futures spreads, options, macro indicators, interest rates, etc. +- **Alternative data** — news feeds, social sentiment (Twitter/Reddit), on-chain metrics, economic calendars, earnings, etc. + +If a user requests a strategy that depends on unavailable data, explain the limitation and offer a crypto-native alternative (e.g. use order-flow imbalance instead of news sentiment). + --- ## Section B — Strategy Metadata @@ -355,3 +364,16 @@ deactivate_strategy(strategy_name) # Stop and get final PnL - 4h bars: 100k bars ≈ 45 years → cap at 5 years (≈ 10,950 bars) 7. **Never `import` from `dexorder` inside `evaluate()`** — the strategy file is exec'd in a sandbox with PandasStrategy and pandas_ta pre-loaded. Standard library and pandas/numpy/pandas_ta are available. + +8. **No LLM calls inside strategies** — strategies must be fully deterministic. LLM invocations are prohibited because: + - They are slow and expensive, making backtesting impractical. + - Any temperature > 0 produces non-repeatable outputs, breaking backtest reproducibility. + - The correct model is: the LLM *writes* the strategy; the strategy runs without LLM involvement. + - Walk-forward LLM integration (via timer or data triggers) is a planned feature but is **not yet implemented**. Do not attempt to approximate it now. + +9. **`evaluate()` must be fast, lightweight, and deterministic** — it is called on every bar during backtesting across potentially hundreds of thousands of bars. Specifically: + - **No heavy computation at runtime**: model inference, large matrix operations, file I/O, network calls, or database queries are forbidden inside `evaluate()`. + - **ML is allowed with restrictions**: a model may be trained offline (e.g. in `__init__` using warm-up data), but inference in `evaluate()` must be fast (microseconds, not milliseconds). If training is compute-intensive, note this clearly in the strategy description. + - **No randomness**: do not use `random`, `np.random`, or any non-seeded stochastic operation. All outputs given the same data must be identical across runs. + +10. **Data scope** — strategies may only use data available in the `dfs` feeds. Do not attempt to fetch external data, call APIs, read files, or access anything outside the provided DataFrames. Crypto OHLCV + buy/sell volume + open interest is what is available; nothing else. diff --git a/gateway/src/k8s/client.ts b/gateway/src/k8s/client.ts index 275dd2ae..2c26e729 100644 --- a/gateway/src/k8s/client.ts +++ b/gateway/src/k8s/client.ts @@ -306,6 +306,25 @@ export class KubernetesClient { } } + /** + * Delete only the Deployment, preserving PVC (user data) and Service (stable DNS). + * Used when applying a license tier change — next ensureContainerRunning recreates + * the deployment with updated resource limits. + */ + async deleteDeploymentOnly(userId: string): Promise { + const deploymentName = KubernetesClient.getDeploymentName(userId); + try { + await this.appsApi.deleteNamespacedDeployment({ + name: deploymentName, + namespace: this.config.namespace + }); + this.config.logger.info({ deploymentName }, 'Deleted deployment (tier change)'); + } catch (error: any) { + const is404 = error.code === 404 || error.response?.statusCode === 404 || error.statusCode === 404; + if (!is404) throw error; + } + } + /** * Delete deployment and associated resources * (Used for cleanup/testing - normally handled by lifecycle sidecar) diff --git a/gateway/src/k8s/container-manager.ts b/gateway/src/k8s/container-manager.ts index 7414a56c..d1aaa105 100644 --- a/gateway/src/k8s/container-manager.ts +++ b/gateway/src/k8s/container-manager.ts @@ -1,9 +1,11 @@ import type { FastifyBaseLogger } from 'fastify'; import { KubernetesClient, type DeploymentSpec } from './client.js'; -import type { License } from '../types/user.js'; +import type { License, LicenseTier } from '../types/user.js'; +import type { UserService } from '../db/user-service.js'; export interface ContainerManagerConfig { k8sClient: KubernetesClient; + userService: UserService; sandboxImage: string; sidecarImage: string; storageClass: string; @@ -139,6 +141,17 @@ export class ContainerManager { return { exists: true, ready, mcpEndpoint }; } + /** + * Apply a canonical license tier to a user: updates DB and deletes the deployment + * so it is recreated with the new resource limits on next connect. + */ + async applyLicenseTier(userId: string, tier: LicenseTier): Promise { + const license = await this.config.userService.setUserLicenseTier(userId, tier); + await this.config.k8sClient.deleteDeploymentOnly(userId); + this.config.logger.info({ userId, tier }, 'License tier applied; deployment will recreate on next connect'); + return license; + } + /** * Delete container (for cleanup/testing) */ diff --git a/gateway/src/llm/router.ts b/gateway/src/llm/router.ts index 971f4a5a..6f1cc3f1 100644 --- a/gateway/src/llm/router.ts +++ b/gateway/src/llm/router.ts @@ -42,7 +42,8 @@ export class ModelRouter { message: string, license: License, strategy: RoutingStrategy = RoutingStrategy.USER_PREFERENCE, - userId?: string + userId?: string, + maxTokens?: number ): Promise<{ model: BaseChatModel; middleware: ModelMiddleware }> { let modelConfig: ModelConfig; @@ -67,12 +68,17 @@ export class ModelRouter { modelConfig = this.defaultModel; } + if (maxTokens !== undefined) { + modelConfig = { ...modelConfig, maxTokens }; + } + this.logger.info( { userId, strategy, provider: modelConfig.provider, model: modelConfig.model, + maxTokens: modelConfig.maxTokens, }, 'Routing to model' ); diff --git a/gateway/src/main.ts b/gateway/src/main.ts index 415f52fa..187b1547 100644 --- a/gateway/src/main.ts +++ b/gateway/src/main.ts @@ -22,6 +22,7 @@ import { AgentHarness, type HarnessSessionConfig } from './harness/agent-harness import { OHLCService } from './services/ohlc-service.js'; import { SymbolIndexService } from './services/symbol-index-service.js'; import { SymbolRoutes } from './routes/symbol-routes.js'; +import { AdminRoutes } from './routes/admin-routes.js'; // Catch unhandled promise rejections for better debugging process.on('unhandledRejection', (reason: any, promise) => { @@ -309,6 +310,7 @@ const k8sClient = new KubernetesClient({ const containerManager = new ContainerManager({ k8sClient, + userService, sandboxImage: config.kubernetes.sandboxImage, sidecarImage: config.kubernetes.sidecarImage, storageClass: config.kubernetes.storageClass, @@ -439,6 +441,9 @@ const getSymbolService = () => symbolIndexService; const symbolRoutes = new SymbolRoutes({ getSymbolIndexService: getSymbolService }); symbolRoutes.register(app); +// Register admin routes +new AdminRoutes(containerManager, userService).register(app); + app.log.debug('All routes registered'); // Health check @@ -715,7 +720,6 @@ try { icebergClient, logger: app.log, }); - await indexService.initialize(); // Assign to module-level variable so onMetadataUpdate callback can use it symbolIndexService = indexService; @@ -723,7 +727,17 @@ try { // Update websocket handler's config so it can use the service (websocketHandler as any).config.symbolIndexService = indexService; - app.log.info({ stats: symbolIndexService.getStats() }, 'Symbol index service initialized'); + // Retry until we get at least some symbol metadata + while (true) { + await indexService.initialize(); + const stats = indexService.getStats(); + if (stats.symbolCount > 0) { + app.log.info({ stats }, 'Symbol index service initialized'); + break; + } + app.log.warn('Symbol index has no metadata yet, retrying in 5 seconds...'); + await new Promise(resolve => setTimeout(resolve, 5000)); + } } catch (error) { app.log.warn({ error }, 'Failed to initialize symbol index service - symbol search will not be available'); } diff --git a/gateway/src/routes/admin-routes.ts b/gateway/src/routes/admin-routes.ts new file mode 100644 index 00000000..79835913 --- /dev/null +++ b/gateway/src/routes/admin-routes.ts @@ -0,0 +1,35 @@ +import type { FastifyInstance } from 'fastify'; +import type { ContainerManager } from '../k8s/container-manager.js'; +import type { UserService } from '../db/user-service.js'; +import type { LicenseTier } from '../types/user.js'; + +const VALID_TIERS: LicenseTier[] = ['free', 'pro', 'enterprise']; + +export class AdminRoutes { + private containerManager: ContainerManager; + private userService: UserService; + + constructor(containerManager: ContainerManager, userService: UserService) { + this.containerManager = containerManager; + this.userService = userService; + } + + register(app: FastifyInstance): void { + app.post<{ Params: { userId: string }; Body: { tier: string } }>( + '/admin/users/:userId/set-tier', + async (req, reply) => { + const { userId } = req.params; + const { tier } = req.body; + if (!VALID_TIERS.includes(tier as LicenseTier)) { + return reply.code(400).send({ error: `Invalid tier. Must be one of: ${VALID_TIERS.join(', ')}` }); + } + const license = await this.containerManager.applyLicenseTier(userId, tier as LicenseTier); + return { userId, tier, license }; + } + ); + + app.post('/admin/migrate-licenses', async () => { + return await this.userService.migrateAllLicenses(); + }); + } +} diff --git a/gateway/src/services/ohlc-service.ts b/gateway/src/services/ohlc-service.ts index ee32328f..0590affb 100644 --- a/gateway/src/services/ohlc-service.ts +++ b/gateway/src/services/ohlc-service.ts @@ -167,11 +167,7 @@ export class OHLCService { period_seconds, }, 'Failed to fetch historical data'); - // Return empty result on error - return { - bars: [], - noData: true, - }; + throw error; } } diff --git a/gateway/src/test-deepinfra-chunks.ts b/gateway/src/test-deepinfra-chunks.ts new file mode 100644 index 00000000..3edfb450 --- /dev/null +++ b/gateway/src/test-deepinfra-chunks.ts @@ -0,0 +1,87 @@ +/** + * Direct DeepInfra streaming test — bypasses LangChain entirely. + * Logs each delta.content with JSON.stringify so spaces are unambiguous. + * + * Usage: + * DEEPINFRA_API_KEY=$(op read "op://Private/DeepInfra/credential") npx tsx src/test-deepinfra-chunks.ts + */ + +export {}; + +const DEEP_INFRA_URL = 'https://api.deepinfra.com/v1/openai/chat/completions'; +const MODEL = 'zai-org/GLM-5'; + +const apiKey = process.env.DEEPINFRA_API_KEY; +if (!apiKey) { + console.error('DEEPINFRA_API_KEY is not set'); + process.exit(1); +} + +const res = await fetch(DEEP_INFRA_URL, { + method: 'POST', + headers: { + Authorization: `Bearer ${apiKey}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + model: MODEL, + stream: true, + messages: [ + { role: 'user', content: 'Write two sentences about ETH price analysis.' }, + ], + }), +}); + +if (!res.ok || !res.body) { + console.error(`HTTP ${res.status}: ${await res.text()}`); + process.exit(1); +} + +const reader = res.body.getReader(); +const decoder = new TextDecoder(); +let chunkIndex = 0; +let assembled = ''; + +console.log(`Testing model: ${MODEL}`); +console.log('--- chunks ---'); + +while (true) { + const { value, done } = await reader.read(); + if (done) break; + + const text = decoder.decode(value, { stream: true }); + + for (const line of text.split('\n')) { + const trimmed = line.trim(); + if (!trimmed.startsWith('data:')) continue; + const data = trimmed.slice(5).trimStart(); + if (data === '[DONE]') break; + + let parsed: unknown; + try { + parsed = JSON.parse(data); + } catch { + continue; + } + + const choice = (parsed as { choices?: Array<{ delta?: Record }> }) + ?.choices?.[0]; + const delta = choice?.delta; + const content = delta?.content as string | undefined; + + if (content !== undefined) { + const endsSpace = content.endsWith(' '); + const startsSpace = content.startsWith(' '); + // Log full delta so we can see all available fields (logprobs, token_ids, etc.) + console.log( + `chunk[${chunkIndex++}]: ${JSON.stringify(content)} ` + + `(len=${content.length}, startsSpace=${startsSpace}, endsSpace=${endsSpace}) ` + + `delta=${JSON.stringify(delta)}`, + ); + assembled += content; + } + } +} + +console.log('--- assembled ---'); +console.log(assembled); diff --git a/gateway/src/tools/platform/indicator-agent.tool.ts b/gateway/src/tools/platform/indicator-agent.tool.ts index 0ba796e5..d489c468 100644 --- a/gateway/src/tools/platform/indicator-agent.tool.ts +++ b/gateway/src/tools/platform/indicator-agent.tool.ts @@ -42,7 +42,8 @@ Use this tool for: - Recommending indicators for a given strategy or analysis goal ALWAYS use this tool for any request about the chart's indicators. -NEVER modify the indicators workspace store directly.`, +NEVER modify the indicators workspace store directly. +NEVER use this tool to switch the chart symbol or timeframe — that is done via workspace_patch on chartState.`, schema: z.object({ instruction: z.string().describe( 'The indicator task to perform. Be specific about which indicators, parameters, ' + diff --git a/gateway/src/tools/platform/web-explore-agent.tool.ts b/gateway/src/tools/platform/web-explore-agent.tool.ts index 6a95d63e..29a3e94b 100644 --- a/gateway/src/tools/platform/web-explore-agent.tool.ts +++ b/gateway/src/tools/platform/web-explore-agent.tool.ts @@ -30,13 +30,18 @@ export function createWebExploreAgentTool(config: WebExploreAgentToolConfig): Dy const tool = new DynamicStructuredTool({ name: 'web_explore', - description: `Search the web or academic databases and return a summarized answer. + description: `Search the EXTERNAL web or academic databases and return a summarized answer. -Use this tool when the user asks about: +Use this tool ONLY for external, public information: - Current events, news, or real-time information -- Documentation, tutorials, or how-to guides +- External documentation, tutorials, or how-to guides for third-party libraries/tools - Academic papers, research findings, or scientific topics -- Any topic that benefits from external sources +- Any topic requiring external sources + +NEVER use this tool for: +- Questions about the Dexorder platform itself (workspace tools, chartState, indicators, strategies) +- Internal API usage (workspace_patch, workspace_read, etc.) — consult the system prompt instead +- Anything that can be answered from the context already available The subagent will search the web (or arXiv for academic queries), fetch relevant content, and return a markdown summary with cited sources.`, schema: z.object({ diff --git a/gateway/src/types/user.ts b/gateway/src/types/user.ts index f57c07e6..7f3a83dc 100644 --- a/gateway/src/types/user.ts +++ b/gateway/src/types/user.ts @@ -76,7 +76,7 @@ export const LICENSE_TIER_TEMPLATES: Record = { maxTokensPerMessage: 4096, rateLimitPerMinute: 10, }, k8sResources: { - memoryRequest: '256Mi', memoryLimit: '512Mi', + memoryRequest: '256Mi', memoryLimit: '8Gi', cpuRequest: '100m', cpuLimit: '500m', storage: '1Gi', tmpSizeLimit: '128Mi', enableIdleShutdown: true, idleTimeoutMinutes: 15, @@ -93,7 +93,7 @@ export const LICENSE_TIER_TEMPLATES: Record = { maxTokensPerMessage: 8192, rateLimitPerMinute: 60, }, k8sResources: { - memoryRequest: '512Mi', memoryLimit: '2Gi', + memoryRequest: '512Mi', memoryLimit: '8Gi', cpuRequest: '250m', cpuLimit: '2000m', storage: '10Gi', tmpSizeLimit: '256Mi', enableIdleShutdown: false, idleTimeoutMinutes: 0, @@ -110,7 +110,7 @@ export const LICENSE_TIER_TEMPLATES: Record = { maxTokensPerMessage: 32768, rateLimitPerMinute: 300, }, k8sResources: { - memoryRequest: '1Gi', memoryLimit: '4Gi', + memoryRequest: '1Gi', memoryLimit: '8Gi', cpuRequest: '500m', cpuLimit: '4000m', storage: '50Gi', tmpSizeLimit: '512Mi', enableIdleShutdown: false, idleTimeoutMinutes: 0, diff --git a/gateway/src/workspace/types.ts b/gateway/src/workspace/types.ts index bd532b8c..f8ba10ac 100644 --- a/gateway/src/workspace/types.ts +++ b/gateway/src/workspace/types.ts @@ -79,12 +79,12 @@ export interface StoreConfig { export const DEFAULT_STORES: StoreConfig[] = [ { name: 'chartState', - persistent: false, + persistent: true, initialState: () => ({ symbol: 'BTC/USDT.BINANCE', start_time: null, end_time: null, - period: '15', + period: 900, selected_shapes: [], }), }, diff --git a/ingestor/src/ccxt-fetcher.js b/ingestor/src/ccxt-fetcher.js index d7fff751..10234e90 100644 --- a/ingestor/src/ccxt-fetcher.js +++ b/ingestor/src/ccxt-fetcher.js @@ -1,6 +1,37 @@ // CCXT data fetcher for historical OHLC and realtime ticks import ccxt from 'ccxt'; +/** + * Thrown when an exchange returns a 429 rate-limit response. + * retryAfterMs is derived from the exchange's Retry-After header when available. + */ +export class ExchangeRateLimitError extends Error { + constructor(exchange, retryAfterMs, originalMessage) { + super(`Rate limit on ${exchange}: retry after ${retryAfterMs}ms (${originalMessage})`); + this.name = 'ExchangeRateLimitError'; + this.exchange = exchange.toUpperCase(); + this.retryAfterMs = retryAfterMs; + } +} + +/** + * Extract retry-after duration in milliseconds from a CCXT RateLimitExceeded error. + * Priority: Retry-After header → error message numeric → 30s fallback. + */ +function extractRetryAfterMs(exchange, error) { + const header = exchange.last_response_headers?.['retry-after']; + if (header) { + const secs = parseFloat(header); + if (!isNaN(secs)) return Math.ceil(secs * 1000); + } + // Some exchanges embed the delay in the message (e.g. "retry after 5000 ms") + const msMatch = error.message?.match(/(\d+)\s*ms/i); + if (msMatch) return parseInt(msMatch[1], 10); + const secMatch = error.message?.match(/(\d+(?:\.\d+)?)\s*s(?:ec|econds?)?/i); + if (secMatch) return Math.ceil(parseFloat(secMatch[1]) * 1000); + return 30_000; +} + export class CCXTFetcher { constructor(config, logger, metadataGenerator = null) { this.config = config; @@ -135,9 +166,12 @@ export class CCXTFetcher { break; } catch (error) { lastError = error; - const isRetryable = error.constructor?.name === 'NetworkError' || + const isRateLimit = error.constructor?.name === 'RateLimitExceeded'; + const isRetryable = !isRateLimit && ( + error.constructor?.name === 'NetworkError' || error.constructor?.name === 'RequestTimeout' || - error.constructor?.name === 'ExchangeNotAvailable'; + error.constructor?.name === 'ExchangeNotAvailable' + ); this.logger.warn( { errorType: error.constructor?.name, @@ -146,15 +180,21 @@ export class CCXTFetcher { ticker, since, attempt, - retryable: isRetryable + retryable: isRetryable, + rateLimit: isRateLimit }, 'OHLC fetch attempt failed' ); - if (!isRetryable || attempt === FETCH_RETRIES) break; + if (isRateLimit || !isRetryable || attempt === FETCH_RETRIES) break; await exchange.sleep(FETCH_RETRY_DELAY_MS * attempt); } } if (lastError) { + if (lastError.constructor?.name === 'RateLimitExceeded') { + const retryAfterMs = extractRetryAfterMs(exchange, lastError); + this.logger.warn({ ticker, retryAfterMs }, 'OHLC fetch rate-limited by exchange'); + throw new ExchangeRateLimitError(exchangeName, retryAfterMs, lastError.message); + } this.logger.error( { errorType: lastError.constructor?.name, @@ -278,6 +318,11 @@ export class CCXTFetcher { // Convert to our Tick format return trades.map(trade => this.convertToTick(trade, ticker, metadata)); } catch (error) { + if (error.constructor?.name === 'RateLimitExceeded') { + const retryAfterMs = extractRetryAfterMs(exchange, error); + this.logger.warn({ ticker, retryAfterMs }, 'Trades fetch rate-limited by exchange'); + throw new ExchangeRateLimitError(exchangeName, retryAfterMs, error.message); + } this.logger.error( { error: error.message, ticker }, 'Error fetching trades' diff --git a/ingestor/src/index.js b/ingestor/src/index.js index 185c5f6d..5bbb8efb 100644 --- a/ingestor/src/index.js +++ b/ingestor/src/index.js @@ -6,9 +6,10 @@ import { parse as parseYaml } from 'yaml'; import pino from 'pino'; import { ZmqClient } from './zmq-client.js'; import { KafkaProducer } from './kafka-producer.js'; -import { CCXTFetcher } from './ccxt-fetcher.js'; +import { CCXTFetcher, ExchangeRateLimitError } from './ccxt-fetcher.js'; import { RealtimePoller } from './realtime-poller.js'; import { SymbolMetadataGenerator } from './symbol-metadata-generator.js'; +import { SlotType } from './proto/messages.js'; // Logger setup const logger = pino({ @@ -64,10 +65,162 @@ function loadConfig() { supported_exchanges: config.supported_exchanges || ['binance', 'coinbase', 'kraken'], symbol_metadata_interval_ms: config.symbol_metadata_interval_ms || 6 * 60 * 60 * 1000, + // Per-exchange slot capacity + exchange_capacity: config.exchange_capacity || { + BINANCE: { historical_slots: 3, realtime_slots: 5 }, + KRAKEN: { historical_slots: 2, realtime_slots: 3 }, + COINBASE: { historical_slots: 2, realtime_slots: 4 } + }, + ...secrets }; } +/** + * Manages work slots per exchange per job type. + * + * Each slot corresponds to one WorkerReady message sent to Flink. Flink consumes + * a slot when it dispatches a job. The slot is re-offered (via another WorkerReady) + * once the job completes, subject to any rate-limit backoff dictated by the exchange. + */ +class SlotPool { + constructor(exchangeCapacity, zmqClient, logger) { + this.zmqClient = zmqClient; + this.logger = logger; + + // Key: 'EXCHANGE|TYPE' (e.g. 'BINANCE|HISTORICAL') + // Value: { max, active: Set, backoffUntil: ms timestamp } + this.slots = new Map(); + + for (const [exchange, cap] of Object.entries(exchangeCapacity)) { + const ex = exchange.toUpperCase(); + this.slots.set(`${ex}|HISTORICAL`, { + max: cap.historical_slots ?? 2, + active: new Set(), + backoffUntil: 0 + }); + this.slots.set(`${ex}|REALTIME`, { + max: cap.realtime_slots ?? 3, + active: new Set(), + backoffUntil: 0 + }); + } + + // jobId → { exchange, type } for release tracking + this.jobMap = new Map(); + } + + /** + * Register the onConnected callback so slot offers are sent on every + * TCP (re)connect rather than once at startup. Handles both the initial + * connection race (Flink ROUTER not yet ready) and Flink restarts. + */ + init() { + this.zmqClient.onConnected = () => this._offerAllFreeSlots(); + this.logger.info( + { slots: [...this.slots.entries()].map(([k, v]) => `${k}:${v.max}`) }, + 'Slot pool initialized — will offer slots on connect' + ); + } + + /** + * Re-offer all currently-free slots. Called on every TCP (re)connect. + * Sends (max - active) WorkerReady messages per exchange+type key. + */ + async _offerAllFreeSlots() { + const summary = []; + for (const [key, slot] of this.slots) { + const [exchange, type] = key.split('|'); + const freeCount = slot.max - slot.active.size; + for (let i = 0; i < freeCount; i++) { + await this.zmqClient.sendTypedReady(exchange, SlotType[type]); + } + summary.push(`${key}:${freeCount}/${slot.max}`); + } + this.logger.info({ offered: summary }, 'Re-offered all free slots on connect'); + } + + /** + * Record a slot as occupied by jobId. + * @param {string} jobId + * @param {string} exchange - e.g. 'BINANCE' + * @param {string} type - 'HISTORICAL' | 'REALTIME' + */ + consumeSlot(jobId, exchange, type) { + const key = `${exchange.toUpperCase()}|${type}`; + const slot = this.slots.get(key); + if (slot) { + if (slot.active.size >= slot.max) { + this.logger.warn({ jobId, key, active: slot.active.size, max: slot.max }, 'Slot capacity exceeded — rejecting job'); + return false; + } + slot.active.add(jobId); + this.jobMap.set(jobId, { exchange: exchange.toUpperCase(), type }); + this.logger.debug({ jobId, key, active: slot.active.size, max: slot.max }, 'Slot consumed'); + return true; + } + this.logger.warn({ jobId, key }, 'No slot config for this exchange+type'); + return false; + } + + /** + * Release the slot occupied by jobId and re-offer it to Flink (after any backoff). + */ + async releaseSlot(jobId) { + const info = this.jobMap.get(jobId); + if (!info) { + this.logger.warn({ jobId }, 'releaseSlot called for unknown jobId'); + return; + } + this.jobMap.delete(jobId); + const key = `${info.exchange}|${info.type}`; + const slot = this.slots.get(key); + if (slot) { + slot.active.delete(jobId); + await this._offerSlot(info.exchange, info.type, slot); + } + } + + /** + * Record a rate limit from the exchange. Delays slot re-offer by retryAfterMs. + * @param {string} exchange + * @param {string} type - 'HISTORICAL' | 'REALTIME' + * @param {number} retryAfterMs + */ + reportRateLimit(exchange, type, retryAfterMs) { + const key = `${exchange.toUpperCase()}|${type}`; + const slot = this.slots.get(key); + if (slot) { + slot.backoffUntil = Math.max(slot.backoffUntil, Date.now() + retryAfterMs); + this.logger.warn({ exchange, type, retryAfterMs }, 'Rate limit backoff set for slot'); + } + } + + async _offerSlot(exchange, type, slot) { + const now = Date.now(); + if (now < slot.backoffUntil) { + const delay = slot.backoffUntil - now; + this.logger.info({ exchange, type, delayMs: delay }, 'Slot in backoff — scheduling re-offer'); + setTimeout(() => this._offerSlot(exchange, type, slot), delay); + return; + } + try { + await this.zmqClient.sendTypedReady(exchange, SlotType[type]); + this.logger.debug({ exchange, type }, 'Slot re-offered to Flink'); + } catch (err) { + this.logger.error({ exchange, type, error: err.message }, 'Failed to re-offer slot'); + } + } + + shutdown() {} +} + +/** Extract exchange name from ticker string, e.g. "BTC/USDT.BINANCE" → "BINANCE" */ +function exchangeOf(ticker) { + const lastDot = ticker?.lastIndexOf('.'); + return (lastDot >= 0) ? ticker.slice(lastDot + 1).toUpperCase() : 'UNKNOWN'; +} + class IngestorWorker { constructor(config, logger) { this.config = config; @@ -92,7 +245,22 @@ class IngestorWorker { logger.child({ component: 'poller' }) ); - // jobId → active realtime subscription (for stop handling) + this.pool = new SlotPool( + config.exchange_capacity, + this.zmqClient, + logger.child({ component: 'pool' }) + ); + + // When realtime poller terminates a subscription due to repeated errors, release its slot. + this.realtimePoller.onJobComplete = (jobId, error) => { + if (error instanceof ExchangeRateLimitError) { + this.pool.reportRateLimit(error.exchange, 'REALTIME', error.retryAfterMs); + } + this.pool.releaseSlot(jobId).catch(err => + this.logger.error({ jobId, error: err.message }, 'Failed to release slot after realtime error')); + }; + + // jobId set for active realtime subscriptions this.activeRealtime = new Set(); this.isShutdown = false; @@ -108,7 +276,10 @@ class IngestorWorker { this.zmqClient.onWorkAssign = req => this.handleWorkAssign(req); this.zmqClient.onWorkStop = jobId => this.handleWorkStop(jobId); - await this.zmqClient.connect(); // also sends WorkerReady + // Register slot offer callback before connecting so we don't miss the event + this.pool.init(); + + await this.zmqClient.connect(); // Generate symbol metadata on startup this.logger.info('Generating initial symbol metadata'); @@ -139,18 +310,26 @@ class IngestorWorker { */ handleWorkAssign(request) { const { jobId, requestId, type, ticker } = request; + const exchange = exchangeOf(ticker); - this.logger.info({ jobId, requestId, type, ticker }, 'Received WorkAssign'); + this.logger.info({ jobId, requestId, type, ticker, exchange }, 'Received WorkAssign'); - // HISTORICAL_OHLC = 0 (proto3 default, may appear as undefined or 'HISTORICAL_OHLC') const isHistorical = !type || type === 'HISTORICAL_OHLC' || type === 0; const isRealtime = type === 'REALTIME_TICKS' || type === 1; if (isHistorical) { + if (!this.pool.consumeSlot(jobId, exchange, 'HISTORICAL')) { + this.zmqClient.sendReject(jobId, 'Slot capacity exceeded').catch(() => {}); + return; + } this.handleHistoricalRequest(request).catch(err => { this.logger.error({ jobId, requestId, error: err.message }, 'Unexpected error in historical handler'); }); } else if (isRealtime) { + if (!this.pool.consumeSlot(jobId, exchange, 'REALTIME')) { + this.zmqClient.sendReject(jobId, 'Slot capacity exceeded').catch(() => {}); + return; + } this.handleRealtimeRequest(request); } else { this.logger.warn({ jobId, type }, 'Unknown request type — rejecting'); @@ -165,7 +344,9 @@ class IngestorWorker { this.logger.info({ jobId }, 'Received WorkStop — cancelling realtime subscription'); this.realtimePoller.cancelSubscription(jobId); this.activeRealtime.delete(jobId); - // No WorkComplete needed — Flink sent the stop, it already knows + this.pool.releaseSlot(jobId).catch(err => + this.logger.warn({ jobId, error: err.message }, 'Failed to release slot after WorkStop')); + // No WorkComplete needed — Flink sent the stop, it already knows. } /** @@ -174,10 +355,14 @@ class IngestorWorker { */ async handleHistoricalRequest(request) { const { jobId, requestId, ticker, historical, clientId: client_id } = request; + const exchange = exchangeOf(ticker); const { startTime: start_time, endTime: end_time, periodSeconds: period_seconds, limit } = historical || {}; this.logger.info({ jobId, requestId, ticker, period_seconds }, 'Processing historical OHLC request'); + // Immediately ack to reset Flink's dispatch-time timeout clock. + await this.zmqClient.sendHeartbeat(jobId); + try { const candles = await this.ccxtFetcher.fetchHistoricalOHLC( ticker, start_time, end_time, period_seconds, limit @@ -193,7 +378,10 @@ class IngestorWorker { const isLastPage = (i + PAGE_SIZE) >= candles.length; await this.kafkaProducer.writeOHLCs(this.config.kafka_ohlc_topic, page, metadata, isLastPage); } - this.logger.info({ jobId, requestId, ticker, count: candles.length, pages: Math.ceil(candles.length / PAGE_SIZE) }, 'Wrote all pages to Kafka'); + this.logger.info( + { jobId, requestId, ticker, count: candles.length, pages: Math.ceil(candles.length / PAGE_SIZE) }, + 'Wrote all pages to Kafka' + ); } else { await this.kafkaProducer.writeMarker(this.config.kafka_ohlc_topic, { request_id: requestId, client_id, ticker, period_seconds, start_time, end_time, @@ -207,6 +395,10 @@ class IngestorWorker { } catch (error) { this.logger.error({ jobId, requestId, ticker, error: error.message }, 'Historical request failed'); + if (error instanceof ExchangeRateLimitError) { + this.pool.reportRateLimit(exchange, 'HISTORICAL', error.retryAfterMs); + } + try { await this.kafkaProducer.writeMarker(this.config.kafka_ohlc_topic, { request_id: requestId, client_id, ticker, period_seconds, start_time, end_time, @@ -218,11 +410,14 @@ class IngestorWorker { await this.zmqClient.sendComplete(jobId, false, error.message); } + + // Release slot regardless of success or failure + this.pool.releaseSlot(jobId).catch(err => + this.logger.error({ jobId, error: err.message }, 'Failed to release historical slot')); } /** * Start realtime tick polling for a job dispatched by Flink. - * Ticks flow: exchange → Kafka market-tick → Flink → OHLC bars → clients. */ handleRealtimeRequest(request) { const { jobId, requestId, ticker } = request; @@ -247,6 +442,7 @@ class IngestorWorker { if (this.metadataInterval) clearInterval(this.metadataInterval); + this.pool.shutdown(); this.realtimePoller.shutdown(); await this.ccxtFetcher.close(); await this.metadataGenerator.close(); diff --git a/ingestor/src/realtime-poller.js b/ingestor/src/realtime-poller.js index a8eea1ce..02867f33 100644 --- a/ingestor/src/realtime-poller.js +++ b/ingestor/src/realtime-poller.js @@ -18,6 +18,10 @@ export class RealtimePoller { this.pollingLoop = null; this.heartbeatLoop = null; + + // Called with (jobId, error) when a subscription terminates abnormally. + // Set by IngestorWorker to release the slot in SlotPool. + this.onJobComplete = null; } /** @@ -147,6 +151,7 @@ export class RealtimePoller { } catch (zmqErr) { this.logger.error({ jobId, error: zmqErr.message }, 'Failed to send WorkComplete after error'); } + if (this.onJobComplete) this.onJobComplete(jobId, error); } } } diff --git a/ingestor/src/zmq-client.js b/ingestor/src/zmq-client.js index 16dbdbe4..b589cb8b 100644 --- a/ingestor/src/zmq-client.js +++ b/ingestor/src/zmq-client.js @@ -28,63 +28,61 @@ export class ZmqClient { this.dealerSocket = null; this.isShutdown = false; - this.activeJobId = null; - this._idleHeartbeatInterval = null; - this.supportedExchanges = (config.supported_exchanges || ['BINANCE', 'COINBASE']) - .map(e => e.toUpperCase()); - - // Callbacks set by IngestorWorker - this.onWorkAssign = null; // (DataRequest) => void - this.onWorkStop = null; // (jobId) => void + // Callbacks set by IngestorWorker / SlotPool + this.onWorkAssign = null; // (DataRequest) => void + this.onWorkStop = null; // (jobId) => void + this.onConnected = null; // async () => void — fires on initial connect AND reconnect } /** * Connect DEALER socket to Flink IngestorBroker (ROUTER). - * Sends WorkerReady immediately so Flink knows this worker is available. + * Fires onConnected on every TCP (re)connect so SlotPool can re-offer slots. */ async connect() { const { flink_hostname, ingestor_broker_port = 5567 } = this.config; this.dealerSocket = new zmq.Dealer(); - const endpoint = `tcp://${flink_hostname}:${ingestor_broker_port}`; - await this.dealerSocket.connect(endpoint); - this.logger.info(`Connected DEALER to Flink IngestorBroker at ${endpoint}`); - // Register as available - await this.sendReady(); - - // Periodically re-send WorkerReady when idle, to recover from missed initial registration - this._idleHeartbeatInterval = setInterval(() => { - if (this.activeJobId === null && !this.isShutdown) { - this.sendReady().catch(err => - this.logger.warn({ error: err.message }, 'Failed to re-send WorkerReady')); + // Subscribe to connection events BEFORE calling connect() so we catch the + // initial establishment. The 'connect' event fires on initial TCP handshake + // and again after every ZMQ reconnect (e.g. Flink restart). + this.dealerSocket.events.on('connect', ({ address }) => { + this.logger.info({ address }, 'DEALER connected to broker'); + if (this.onConnected) { + this.onConnected().catch(err => + this.logger.error({ error: err.message }, 'onConnected callback failed')); } - }, 30_000); + }); + + const endpoint = `tcp://${flink_hostname}:${ingestor_broker_port}`; + this.dealerSocket.connect(endpoint); + this.logger.info(`Connecting DEALER to Flink IngestorBroker at ${endpoint}`); // Start receiving work in background this._receiveLoop(); } /** - * Send WorkerReady — called on connect and after each COMPLETE. + * Send one typed WorkerReady slot offer. + * @param {string} exchange - Exchange name (e.g. 'BINANCE') + * @param {number} slotType - SlotType enum value (0=ANY, 1=HISTORICAL, 2=REALTIME) */ - async sendReady() { + async sendTypedReady(exchange, slotType) { const frames = encodeBrokerMessage( MessageTypeId.WORKER_READY, - { exchanges: this.supportedExchanges }, + { exchanges: [exchange], jobType: slotType }, WorkerReady ); await this.dealerSocket.send(frames); - this.logger.info({ exchanges: this.supportedExchanges }, 'Sent WorkerReady'); + this.logger.debug({ exchange, slotType }, 'Sent WorkerReady slot offer'); } /** * Send WorkComplete after a historical job finishes. - * Automatically sends WorkerReady so Flink returns us to the free pool. + * Slot re-registration is handled by SlotPool after this call. */ async sendComplete(jobId, success, errorMessage) { - this.activeJobId = null; const frames = encodeBrokerMessage( MessageTypeId.WORK_COMPLETE, { @@ -96,9 +94,6 @@ export class ZmqClient { ); await this.dealerSocket.send(frames); this.logger.info({ jobId, success }, 'Sent WorkComplete'); - - // Return to free pool - await this.sendReady(); } /** @@ -153,12 +148,10 @@ export class ZmqClient { const payload = frames[2].slice(1); if (typeId === MessageTypeId.WORK_ASSIGN) { - // DataRequest protobuf const request = DataRequest.decode(payload); const req = DataRequest.toObject(request, { longs: String, enums: String, bytes: Buffer }); - this.activeJobId = req.jobId; this.logger.info( { jobId: req.jobId, requestId: req.requestId, type: req.type, ticker: req.ticker }, 'Received WorkAssign from broker' @@ -192,10 +185,6 @@ export class ZmqClient { async shutdown() { this.isShutdown = true; - if (this._idleHeartbeatInterval) { - clearInterval(this._idleHeartbeatInterval); - this._idleHeartbeatInterval = null; - } this.logger.info('Shutting down ZMQ DEALER connection'); if (this.dealerSocket) { this.dealerSocket.close(); diff --git a/protobuf/ingestor.proto b/protobuf/ingestor.proto index 056683ab..40f4dfb0 100644 --- a/protobuf/ingestor.proto +++ b/protobuf/ingestor.proto @@ -333,12 +333,27 @@ message FieldValue { // ─── Ingestor Broker Protocol (Flink ROUTER ↔ Ingestor DEALER, port 5567) ─── // Message type IDs 0x20–0x25 +// +// Capacity model: each WorkerReady is ONE slot offer for a specific exchange +// and job type. The ingestor sends N WorkerReady messages at startup (one per +// available slot) and re-sends one after each job completes, subject to any +// rate-limit backoff. -// Ingestor → Flink: register as available (type 0x20) -// Sent on DEALER connect and after every COMPLETE. +// Job type for a slot offer or assignment. +enum SlotType { + ANY = 0; // accepts any job type + HISTORICAL = 1; // historical OHLC fetch slot + REALTIME = 2; // realtime tick subscription slot +} + +// Ingestor → Flink: offer one work slot (type 0x20) +// Sent once per available slot at startup and after each job completes. +// One WorkerReady = one slot for one exchange and one job type. message WorkerReady { - // Exchanges this ingestor supports (e.g. ["BINANCE", "COINBASE"]) + // Exchange this slot handles (single entry, e.g. ["BINANCE"]) repeated string exchanges = 1; + // Job type this slot accepts + SlotType job_type = 2; } // Ingestor → Flink: historical job finished (type 0x21) diff --git a/sandbox/dexorder/conda_manager.py b/sandbox/dexorder/conda_manager.py index 04e60b5c..9dcfaa35 100644 --- a/sandbox/dexorder/conda_manager.py +++ b/sandbox/dexorder/conda_manager.py @@ -510,3 +510,44 @@ def sync_packages(data_dir: Path, environment_yml: Optional[Path] = None) -> dic log.info(f"Conda package sync complete: {len(result['removed'])} packages removed") return result + + +# ============================================================================= +# Async wrappers — non-blocking equivalents for use from asyncio contexts +# ============================================================================= + +import asyncio as _asyncio + + +async def get_installed_packages_async() -> Set[str]: + """Non-blocking wrapper around get_installed_packages().""" + return await _asyncio.to_thread(get_installed_packages) + + +async def install_packages_async( + packages: list[str], + data_dir: Optional[Path] = None, +) -> dict: + """Non-blocking wrapper around install_packages().""" + return await _asyncio.to_thread(install_packages, packages, data_dir) + + +async def remove_packages_async(packages: list[str]) -> dict: + """Non-blocking wrapper around remove_packages().""" + return await _asyncio.to_thread(remove_packages, packages) + + +async def cleanup_extra_packages_async( + data_dir: Path, + environment_yml: Optional[Path] = None, +) -> dict: + """Non-blocking wrapper around cleanup_extra_packages().""" + return await _asyncio.to_thread(cleanup_extra_packages, data_dir, environment_yml) + + +async def sync_packages_async( + data_dir: Path, + environment_yml: Optional[Path] = None, +) -> dict: + """Non-blocking wrapper around sync_packages().""" + return await _asyncio.to_thread(sync_packages, data_dir, environment_yml) diff --git a/sandbox/dexorder/event_loop.py b/sandbox/dexorder/event_loop.py new file mode 100644 index 00000000..00e4f7ab --- /dev/null +++ b/sandbox/dexorder/event_loop.py @@ -0,0 +1,54 @@ +""" +Thread-safe asyncio.run() for the sandbox. + +Installs a global replacement for asyncio.run() that, when called from a +non-async thread while uvicorn's event loop is running, dispatches the +coroutine to that loop via run_coroutine_threadsafe(). The calling thread +blocks on future.result() — releasing the GIL — so uvicorn's loop runs +freely (health checks, MCP requests, etc.). + +Usage: + from dexorder.event_loop import install_thread_safe_asyncio_run + install_thread_safe_asyncio_run(asyncio.get_running_loop()) # call once at startup +""" + +import asyncio +import logging + +log = logging.getLogger(__name__) + +_main_loop: asyncio.AbstractEventLoop | None = None +_original_asyncio_run = asyncio.run + + +def install_thread_safe_asyncio_run(loop: asyncio.AbstractEventLoop) -> None: + """ + Patch asyncio.run globally to cooperate with uvicorn's event loop. + Call once from the lifespan startup (main thread, loop already running). + """ + global _main_loop + _main_loop = loop + + def _thread_safe_run(coro, *, debug=None): + # Detect if we're in a thread (no running loop in this thread) + try: + asyncio.get_running_loop() + # We're already inside an async context — asyncio.run() is not + # valid here regardless; let it raise the normal error. + raise RuntimeError( + "asyncio.run() cannot be called when another event loop is running " + "in the same thread." + ) + except RuntimeError as exc: + if "cannot be called" in str(exc): + raise + # No running loop in this thread — safe to dispatch to main loop. + if _main_loop is not None and _main_loop.is_running(): + log.debug("asyncio.run() from thread → run_coroutine_threadsafe") + return asyncio.run_coroutine_threadsafe(coro, _main_loop).result() + + # Fallback: main loop not available (e.g., called before startup or in tests) + return _original_asyncio_run(coro, debug=debug) + + asyncio.run = _thread_safe_run + log.info("Installed thread-safe asyncio.run()") diff --git a/sandbox/dexorder/iceberg_client.py b/sandbox/dexorder/iceberg_client.py index 03f2d411..6c762d4d 100644 --- a/sandbox/dexorder/iceberg_client.py +++ b/sandbox/dexorder/iceberg_client.py @@ -5,6 +5,8 @@ Tickers use Nautilus format: "BTC/USDT.BINANCE" All timestamps are nanoseconds since epoch. """ +import tracemalloc +from pathlib import Path from typing import Optional, List, Tuple import pandas as pd import logging @@ -19,6 +21,19 @@ from pyiceberg.expressions import ( log = logging.getLogger(__name__) +def _rss_mb() -> str: + """Return current VmRSS and VmPeak from /proc/self/status as a short string.""" + try: + info = {} + for line in Path("/proc/self/status").read_text().splitlines(): + for key in ("VmRSS", "VmPeak", "VmSize"): + if line.startswith(f"{key}:"): + info[key] = int(line.split()[1]) // 1024 # kB → MB + return f"RSS={info.get('VmRSS','?')}MB peak={info.get('VmPeak','?')}MB virt={info.get('VmSize','?')}MB" + except Exception: + return "?" + + class IcebergClient: """ Client for querying OHLC data from Iceberg warehouse (Iceberg 1.10.1). @@ -114,8 +129,21 @@ class IcebergClient: if fetch_columns is not None: scan = scan.select(*fetch_columns) + if not tracemalloc.is_tracing(): + tracemalloc.start() + tm_before = tracemalloc.take_snapshot() + log.info("MEM before scan.to_pandas(): %s", _rss_mb()) + df = scan.to_pandas() + log.info("MEM after scan.to_pandas(): %s | rows=%d cols=%s mem=%dMB", + _rss_mb(), len(df), list(df.columns), + df.memory_usage(deep=True).sum() // (1024 * 1024)) + tm_after = tracemalloc.take_snapshot() + top = tm_after.compare_to(tm_before, "lineno") + for stat in top[:5]: + log.info("TRACEMALLOC: %s", stat) + if not df.empty: # Deduplicate: keep the most-recently-ingested row per timestamp. if "ingested_at" in df.columns: @@ -123,6 +151,7 @@ class IcebergClient: df.sort_values("ingested_at", ascending=False) .drop_duplicates(subset=["timestamp"]) ) + log.info("MEM after dedup: %s | rows=%d", _rss_mb(), len(df)) # Drop ingested_at if the caller did not ask for it if columns is not None and "ingested_at" not in columns and "ingested_at" in df.columns: df = df.drop(columns=["ingested_at"]) diff --git a/sandbox/dexorder/memory_guard.py b/sandbox/dexorder/memory_guard.py new file mode 100644 index 00000000..baf68ee7 --- /dev/null +++ b/sandbox/dexorder/memory_guard.py @@ -0,0 +1,85 @@ +""" +Memory guard for sandbox containers. + +Sets a soft RLIMIT_AS limit derived from the cgroup memory limit at a +configurable fraction, so Python raises MemoryError before the kernel's +OOM killer fires. The MCP session survives; only the tool call fails. +""" + +import gc +import logging +import resource +from pathlib import Path + +log = logging.getLogger(__name__) + + +def _read_cgroup_limit_bytes() -> int | None: + """Read container memory.max from cgroup v2. Returns bytes or None.""" + try: + val = Path("/sys/fs/cgroup/memory.max").read_text().strip() + if val == "max": + return None + return int(val) + except Exception: + return None + + +def setup_memory_limit(fraction: float) -> None: + """ + Set RLIMIT_AS soft limit to baseline VmSize + allowed growth. + + RLIMIT_AS caps total virtual address space, which includes shared libraries + and memory-mapped files that don't consume physical RAM. The baseline VmSize + at startup can be 3+ GB even when RSS is only ~200 MB. Setting the limit to + a flat cgroup fraction would crash immediately. + + Instead: limit = current VmSize + (cgroup_limit * fraction) + This allows `fraction` worth of new allocations (numpy arrays, pandas + dataframes, etc.) above the startup baseline before raising MemoryError. + + Args: + fraction: Proportion of cgroup memory.max to allow as new growth, e.g. 0.85. + """ + cgroup_bytes = _read_cgroup_limit_bytes() + + # Read baseline VmSize (total virtual address space at startup) + vmsize_bytes: int | None = None + try: + for line in Path("/proc/self/status").read_text().splitlines(): + if line.startswith("VmSize:"): + vmsize_bytes = int(line.split()[1]) * 1024 # kB → bytes + log.info("Memory baseline: %s", line.strip()) + elif line.startswith("VmRSS:"): + log.info("Memory baseline: %s", line.strip()) + except Exception: + pass + + if cgroup_bytes is None: + log.warning("cgroup memory.max is unlimited; RLIMIT_AS not set") + return + + allowed_growth_bytes = int(cgroup_bytes * fraction) + baseline = vmsize_bytes or 0 + limit_bytes = baseline + allowed_growth_bytes + + _, hard = resource.getrlimit(resource.RLIMIT_AS) + resource.setrlimit(resource.RLIMIT_AS, (limit_bytes, hard)) + log.info( + "RLIMIT_AS soft limit set to %d MB (baseline %d MB + allowed growth %d MB, %.0f%% of cgroup %d MB)", + limit_bytes // (1024 * 1024), + baseline // (1024 * 1024), + allowed_growth_bytes // (1024 * 1024), + fraction * 100, + cgroup_bytes // (1024 * 1024), + ) + + +def cleanup_memory() -> None: + """ + Called after a MemoryError is caught in a tool execution thread. + Runs gc.collect() to free objects held by the failed script. + Hook here for future recovery strategies (cache eviction, etc.). + """ + log.warning("MemoryError in tool thread — running gc.collect()") + gc.collect() diff --git a/sandbox/dexorder/tools/backtest_harness.py b/sandbox/dexorder/tools/backtest_harness.py new file mode 100644 index 00000000..4e624340 --- /dev/null +++ b/sandbox/dexorder/tools/backtest_harness.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +""" +backtest_harness — runs a strategy backtest as a subprocess. + +Reads a JSON config from stdin: +{ + "strategy_name": str, + "feeds": [{"symbol": str, "period_seconds": int}, ...], + "from_time": ..., + "to_time": ..., + "initial_capital": float, + "paper": bool +} + +Outputs JSON to stdout on success: +{ + "strategy_name": str, + "feeds": [...], + "initial_capital": float, + "paper": bool, + "total_candles": int, + ... (metrics from run_backtest) +} + +On error: +{"error": str} +""" + +import asyncio +import json +import os +import sys +import traceback +from pathlib import Path + +# Ensure dexorder package is importable when run as a subprocess +sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent)) + +_OHLC_EXTRA_COLUMNS = [ + "volume", "buy_vol", "sell_vol", + "open_time", "high_time", "low_time", "close_time", + "open_interest", +] + + +async def _run(cfg: dict) -> dict: + strategy_name = cfg["strategy_name"] + feeds = cfg["feeds"] + from_time = cfg.get("from_time") + to_time = cfg.get("to_time") + initial_capital = float(cfg.get("initial_capital", 10_000.0)) + paper = bool(cfg.get("paper", True)) + + # ------------------------------------------------------------------------- + # Initialize API + # ------------------------------------------------------------------------- + try: + import yaml + + config_path = os.environ.get("CONFIG_PATH", "/app/config/config.yaml") + secrets_path = os.environ.get("SECRETS_PATH", "/app/config/secrets.yaml") + + config_data = {} + secrets_data = {} + if Path(config_path).exists(): + with open(config_path) as f: + config_data = yaml.safe_load(f) or {} + if Path(secrets_path).exists(): + with open(secrets_path) as f: + secrets_data = yaml.safe_load(f) or {} + + data_cfg = config_data.get("data", {}) + iceberg_cfg = data_cfg.get("iceberg", {}) + relay_cfg = data_cfg.get("relay", {}) + + from dexorder.api import set_api, API + from dexorder.impl.charting_api_impl import ChartingAPIImpl + from dexorder.impl.data_api_impl import DataAPIImpl + + data_api = DataAPIImpl( + iceberg_catalog_uri=iceberg_cfg.get("catalog_uri", "http://iceberg-catalog:8181"), + relay_endpoint=relay_cfg.get("endpoint", "tcp://relay:5559"), + notification_endpoint=relay_cfg.get("notification_endpoint", "tcp://relay:5558"), + namespace=iceberg_cfg.get("namespace", "trading"), + s3_endpoint=iceberg_cfg.get("s3_endpoint") or secrets_data.get("s3_endpoint"), + s3_access_key=iceberg_cfg.get("s3_access_key") or secrets_data.get("s3_access_key"), + s3_secret_key=iceberg_cfg.get("s3_secret_key") or secrets_data.get("s3_secret_key"), + s3_region=iceberg_cfg.get("s3_region") or secrets_data.get("s3_region"), + request_timeout=240.0, + ) + set_api(API(charting=ChartingAPIImpl(), data=data_api)) + except Exception as e: + return {"error": f"API initialization failed: {e}"} + + # ------------------------------------------------------------------------- + # Locate strategy + # ------------------------------------------------------------------------- + data_dir = Path(os.environ.get("DATA_DIR", "/app/data")) + try: + from dexorder.tools.python_tools import get_category_manager, sanitize_name + category_manager = get_category_manager(data_dir) + safe_name = sanitize_name(strategy_name) + impl_path = category_manager.src_dir / "strategy" / safe_name / "implementation.py" + if not impl_path.exists(): + return {"error": f"Strategy '{strategy_name}' not found (looked at {impl_path})"} + except Exception as exc: + return {"error": f"Failed to locate strategy: {exc}"} + + # ------------------------------------------------------------------------- + # Register custom indicators and load strategy class + # ------------------------------------------------------------------------- + try: + from dexorder.nautilus.backtest_runner import _setup_custom_indicators + _setup_custom_indicators(category_manager.src_dir) + except Exception as exc: + sys.stderr.write(f"WARNING: custom indicator setup failed: {exc}\n") + + try: + from dexorder.nautilus.backtest_runner import _load_strategy_class + strategy_class = _load_strategy_class(impl_path) + except Exception: + return {"error": f"Strategy load failed:\n{traceback.format_exc()}"} + + # ------------------------------------------------------------------------- + # Fetch OHLC data + # ------------------------------------------------------------------------- + from dexorder.api import get_api + from dexorder.nautilus.pandas_strategy import make_feed_key + + api = get_api() + parsed_feeds = [(f["symbol"], int(f["period_seconds"])) for f in feeds] + ohlc_dfs = {} + total_candles = 0 + + for ticker, period_seconds in parsed_feeds: + feed_key = make_feed_key(ticker, period_seconds) + try: + df = await api.data.historical_ohlc( + ticker=ticker, + period_seconds=period_seconds, + start_time=from_time, + end_time=to_time, + extra_columns=_OHLC_EXTRA_COLUMNS, + ) + except Exception as exc: + return {"error": f"OHLC fetch failed for {feed_key}: {exc}"} + + if df.empty: + return {"error": f"No OHLC data for {feed_key} in the requested range"} + + ohlc_dfs[feed_key] = df + total_candles += len(df) + + # ------------------------------------------------------------------------- + # Run backtest (synchronous) + # ------------------------------------------------------------------------- + try: + from dexorder.nautilus.backtest_runner import run_backtest + metrics = run_backtest( + strategy_class=strategy_class, + feeds=parsed_feeds, + ohlc_dfs=ohlc_dfs, + initial_capital=initial_capital, + paper=paper, + ) + except Exception: + return {"error": f"Backtest failed:\n{traceback.format_exc()}"} + + return { + "strategy_name": strategy_name, + "feeds": [{"symbol": t, "period_seconds": p} for t, p in parsed_feeds], + "initial_capital": initial_capital, + "paper": paper, + "total_candles": total_candles, + **metrics, + } + + +def main(): + cfg = json.loads(sys.stdin.read()) + result = asyncio.run(_run(cfg)) + print(json.dumps(result)) + + +if __name__ == "__main__": + main() diff --git a/sandbox/dexorder/tools/backtest_strategy.py b/sandbox/dexorder/tools/backtest_strategy.py index 9ea619cd..b8c7ff87 100644 --- a/sandbox/dexorder/tools/backtest_strategy.py +++ b/sandbox/dexorder/tools/backtest_strategy.py @@ -1,25 +1,21 @@ """ backtest_strategy — run a PandasStrategy against historical OHLC data. -Called directly from the MCP server's async handle_tool_call. - -Returns a JSON payload with backtest metrics and equity curve, following the -same pattern as evaluate_indicator.py. +Spawns backtest_harness.py as a subprocess so user strategy code is isolated +from the MCP server process. The harness handles API init, data fetch, and +the synchronous BacktestEngine internally. """ +import asyncio import json import logging +import sys from pathlib import Path from typing import Any log = logging.getLogger(__name__) -# All OHLC+ columns to request from the DataAPI -_OHLC_EXTRA_COLUMNS = [ - "volume", "buy_vol", "sell_vol", - "open_time", "high_time", "low_time", "close_time", - "open_interest", -] +_BACKTEST_HARNESS = Path(__file__).parent / "backtest_harness.py" async def backtest_strategy( @@ -42,23 +38,8 @@ async def backtest_strategy( paper: Always True for historical backtest (flag reserved for forward testing) Returns: - list[TextContent] with JSON payload: - { - "strategy_name": str, - "feeds": [...], - "initial_capital": float, - "paper": bool, - "total_candles": int, - "total_return": float, # fractional (0.15 = +15%) - "sharpe_ratio": float, - "max_drawdown": float, # fractional (0.10 = 10% drawdown) - "win_rate": float, - "trade_count": int, - "equity_curve": [{"timestamp": int, "equity": float}, ...] - } - - On error: - {"error": str} + list[TextContent] with JSON payload containing backtest metrics. + On error: [TextContent] with {"error": str} """ from mcp.types import TextContent @@ -66,102 +47,52 @@ async def backtest_strategy( log.error("backtest_strategy '%s': %s", strategy_name, msg) return [TextContent(type="text", text=json.dumps({"error": msg}))] - # --- 1. Validate feeds input --- if not feeds: return _err("feeds list is empty — provide at least one {symbol, period_seconds} entry") - parsed_feeds: list[tuple[str, int]] = [] for f in feeds: - sym = f.get("symbol", "") - ps = f.get("period_seconds", 3600) - if not sym: + if not f.get("symbol"): return _err(f"Feed entry missing 'symbol': {f}") - parsed_feeds.append((sym, int(ps))) - # --- 2. Resolve strategy implementation file --- - try: - from dexorder.tools.python_tools import get_category_manager, sanitize_name - category_manager = get_category_manager() - safe_name = sanitize_name(strategy_name) - impl_path = category_manager.src_dir / "strategy" / safe_name / "implementation.py" - if not impl_path.exists(): - return _err(f"Strategy '{strategy_name}' not found (looked at {impl_path})") - except Exception as exc: - return _err(f"Failed to locate strategy: {exc}") - - # --- 3. Register custom indicators with pandas-ta --- - try: - from dexorder.nautilus.backtest_runner import _setup_custom_indicators - _setup_custom_indicators(category_manager.src_dir) - except Exception as exc: - log.warning("backtest_strategy: custom indicator setup failed: %s", exc) - - # --- 4. Load strategy class --- - try: - from dexorder.nautilus.backtest_runner import _load_strategy_class - strategy_class = _load_strategy_class(impl_path) - except Exception as exc: - log.exception("backtest_strategy: strategy load failed") - return _err(f"Strategy load failed: {exc}") - - # --- 5. Fetch OHLC+ data for each feed --- - try: - from dexorder.api import get_api - api = get_api() - except Exception as exc: - return _err(f"API not available: {exc}") - - ohlc_dfs: dict[str, Any] = {} - total_candles = 0 - - for ticker, period_seconds in parsed_feeds: - from dexorder.nautilus.pandas_strategy import make_feed_key - feed_key = make_feed_key(ticker, period_seconds) - try: - df = await api.data.historical_ohlc( - ticker=ticker, - period_seconds=period_seconds, - start_time=from_time, - end_time=to_time, - extra_columns=_OHLC_EXTRA_COLUMNS, - ) - except Exception as exc: - log.exception("backtest_strategy: OHLC fetch failed for %s", feed_key) - return _err(f"OHLC fetch failed for {feed_key}: {exc}") - - if df.empty: - return _err(f"No OHLC data for {feed_key} in the requested range") - - ohlc_dfs[feed_key] = df - total_candles += len(df) - - # --- 6. Run backtest in thread executor (BacktestEngine is synchronous) --- - try: - import asyncio - from dexorder.nautilus.backtest_runner import run_backtest - - loop = asyncio.get_event_loop() - metrics = await loop.run_in_executor( - None, - lambda: run_backtest( - strategy_class=strategy_class, - feeds=parsed_feeds, - ohlc_dfs=ohlc_dfs, - initial_capital=initial_capital, - paper=paper, - ), - ) - except Exception as exc: - log.exception("backtest_strategy: backtest run failed") - return _err(f"Backtest failed: {exc}") - - # --- 7. Return results --- - payload = { - "strategy_name": strategy_name, - "feeds": [{"symbol": t, "period_seconds": p} for t, p in parsed_feeds], + cfg = { + "strategy_name": strategy_name, + "feeds": feeds, + "from_time": from_time, + "to_time": to_time, "initial_capital": initial_capital, - "paper": paper, - "total_candles": total_candles, - **metrics, # keys: summary, statistics, trades, equity_curve + "paper": paper, } + + try: + proc = await asyncio.create_subprocess_exec( + sys.executable, str(_BACKTEST_HARNESS), + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for( + proc.communicate(json.dumps(cfg).encode()), + timeout=600, + ) + except asyncio.TimeoutError: + return _err("Backtest timed out (10 minutes)") + except Exception as exc: + return _err(f"Failed to launch backtest harness: {exc}") + + if proc.returncode != 0: + err_text = stderr.decode(errors="replace") + log.error("backtest_strategy '%s': harness exited %d: %s", strategy_name, proc.returncode, err_text[:500]) + return _err(f"Backtest harness failed:\n{err_text}") + + if stderr: + log.warning("backtest_strategy '%s' stderr: %s", strategy_name, stderr.decode(errors="replace")[:500]) + + try: + payload = json.loads(stdout.decode()) + except json.JSONDecodeError: + return _err(f"Harness produced invalid JSON: {stdout.decode(errors='replace')[:200]}") + + if "error" in payload: + return _err(payload["error"]) + return [TextContent(type="text", text=json.dumps(payload))] diff --git a/sandbox/dexorder/tools/python_tools.py b/sandbox/dexorder/tools/python_tools.py index 8d15d973..fb9390cb 100644 --- a/sandbox/dexorder/tools/python_tools.py +++ b/sandbox/dexorder/tools/python_tools.py @@ -18,51 +18,32 @@ After write/edit operations, a category-specific test harness runs to validate the code and capture errors/output for agent feedback. """ -import concurrent.futures import json import logging import re import subprocess import sys -import traceback from dataclasses import dataclass, asdict from enum import Enum from pathlib import Path from typing import Any, Optional +from dexorder.tools.subprocess_runner import run_subprocess_argv, run_in_thread + log = logging.getLogger(__name__) - -def _run_inprocess(fn, *args, timeout: int) -> dict: - """ - Run fn(*args) in a one-shot thread and return its result dict. - - Uses a thread so the calling coroutine is not blocked and the calling - process does not fork a new Python interpreter. All already-loaded - libraries (numpy, pandas, matplotlib, etc.) are shared with the thread. - - On timeout returns a dict with _timeout=True. On unexpected exception - returns a dict with error=True and the traceback in stderr. - """ - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: - future = executor.submit(fn, *args) - try: - return future.result(timeout=timeout) - except concurrent.futures.TimeoutError: - return {"_timeout": True, "error": True, - "stdout": "", "stderr": "", "images": []} - except Exception: - return {"error": True, "stdout": "", - "stderr": traceback.format_exc(), "images": []} +# Paths to harness scripts run as subprocesses +_RESEARCH_HARNESS = Path(__file__).parent / "research_harness.py" +_STRATEGY_HARNESS = Path(__file__).parent / "strategy_harness.py" # Import conda manager for package installation and tracking try: - from dexorder.conda_manager import install_packages, cleanup_extra_packages + from dexorder.conda_manager import install_packages_async, cleanup_extra_packages_async except ImportError: log.warning("conda_manager not available - package installation disabled") - install_packages = None - cleanup_extra_packages = None + install_packages_async = None + cleanup_extra_packages_async = None # ============================================================================= @@ -355,6 +336,39 @@ class GitManager: except Exception: pass + # ------------------------------------------------------------------ + # Async variants — delegates to sync methods via asyncio.to_thread + # so the event loop stays responsive during git operations. + # ------------------------------------------------------------------ + + async def commit_async(self, message: str) -> Optional[str]: + import asyncio + return await asyncio.to_thread(self.commit, message) + + async def log_async(self, path: Optional[Path] = None, n: int = 20) -> list[dict]: + import asyncio + return await asyncio.to_thread(self.log, path, n) + + async def restore_async(self, revision: str, path: Optional[Path] = None) -> Optional[str]: + import asyncio + return await asyncio.to_thread(self.restore, revision, path) + + async def head_short_hash_async(self) -> str: + import asyncio + return await asyncio.to_thread(self.head_short_hash) + + async def create_worktree_async(self, worktree_path: Path, revision: str = "HEAD") -> str: + import asyncio + return await asyncio.to_thread(self.create_worktree, worktree_path, revision) + + async def remove_worktree_async(self, worktree_path: Path) -> None: + import asyncio + return await asyncio.to_thread(self.remove_worktree, worktree_path) + + async def prune_worktrees_async(self) -> None: + import asyncio + return await asyncio.to_thread(self.prune_worktrees) + # ============================================================================= # Custom Indicator Setup @@ -484,7 +498,7 @@ class CategoryFileManager: """Root of the versioned category code (git repo root).""" return self.data_dir / "src" - def write( + async def write( self, category: str, name: str, @@ -547,7 +561,7 @@ class CategoryFileManager: return {"success": False, "error": f"Failed to write metadata: {e}"} # Run validation harness - validation = self._validate(cat, item_dir) + validation = await self._validate(cat, item_dir) result = { "success": validation["success"], @@ -559,19 +573,19 @@ class CategoryFileManager: if validation["success"]: if cat == Category.RESEARCH: log.info(f"Auto-executing research script: {name}") - result["execution"] = self.execute_research(name) + result["execution"] = await self.execute_research(name) elif cat == Category.INDICATOR: log.info(f"Auto-executing indicator test: {name}") - result["execution"] = self._execute_indicator(item_dir) + result["execution"] = await self._execute_indicator(item_dir) # Commit to git - commit_hash = self.git.commit(f"create({category}): {name}") + commit_hash = await self.git.commit_async(f"create({category}): {name}") if commit_hash: result["revision"] = commit_hash return result - def edit( + async def edit( self, category: str, name: str, @@ -671,7 +685,7 @@ class CategoryFileManager: # Run validation harness if code was updated validation = None if code is not None: - validation = self._validate(cat, item_dir) + validation = await self._validate(cat, item_dir) result = { "success": True, @@ -685,15 +699,15 @@ class CategoryFileManager: if code is not None and result["success"]: if cat == Category.RESEARCH: log.info(f"Auto-executing research script after edit: {name}") - result["execution"] = self.execute_research(name) + result["execution"] = await self.execute_research(name) elif cat == Category.INDICATOR: log.info(f"Auto-executing indicator test after edit: {name}") - result["execution"] = self._execute_indicator(item_dir) + result["execution"] = await self._execute_indicator(item_dir) # Commit to git if code changed if code is not None and result["success"]: action = "patch" if patches is not None else "edit" - commit_hash = self.git.commit(f"{action}({category}): {name}") + commit_hash = await self.git.commit_async(f"{action}({category}): {name}") if commit_hash: result["revision"] = commit_hash @@ -776,7 +790,7 @@ class CategoryFileManager: return {"items": items} - def _validate(self, category: Category, item_dir: Path) -> dict[str, Any]: + async def _validate(self, category: Category, item_dir: Path) -> dict[str, Any]: """ Run category-specific validation harness. @@ -793,13 +807,13 @@ class CategoryFileManager: # Install required packages before validation packages_installed = [] - if install_packages and meta_path.exists(): + if install_packages_async and meta_path.exists(): try: metadata = json.loads(meta_path.read_text()) conda_packages = metadata.get("conda_packages", []) if conda_packages: log.info(f"Installing packages for validation: {conda_packages}") - install_result = install_packages(conda_packages, data_dir=self.data_dir) + install_result = await install_packages_async(conda_packages, data_dir=self.data_dir) if install_result.get("success"): packages_installed = install_result.get("installed", []) if packages_installed: @@ -811,11 +825,11 @@ class CategoryFileManager: # Run validation if category == Category.STRATEGY: - result = self._validate_strategy(impl_path) + result = await self._validate_strategy(impl_path) elif category == Category.INDICATOR: - result = self._validate_indicator(impl_path) + result = await self._validate_indicator(impl_path) elif category == Category.RESEARCH: - result = self._validate_research(impl_path, item_dir) + result = await self._validate_research(impl_path, item_dir) else: result = {"success": False, "error": f"No validator for category {category}"} @@ -825,19 +839,18 @@ class CategoryFileManager: return result - def _validate_strategy(self, impl_path: Path) -> dict[str, Any]: + async def _validate_strategy(self, impl_path: Path) -> dict[str, Any]: """ Validate a strategy by running it against synthetic OHLC data. - Runs strategy_harness.py in-process via a thread. Catches import errors, + Runs strategy_harness.py as a subprocess. Catches import errors, runtime errors in evaluate(), and wrong class hierarchy — not just syntax. """ - meta_path = impl_path.parent / "metadata.json" - return self._execute_strategy(impl_path.parent, timeout=45) + return await self._execute_strategy(impl_path.parent, timeout=45) - def _execute_strategy(self, item_dir: Path, timeout: int = 45) -> dict[str, Any]: + async def _execute_strategy(self, item_dir: Path, timeout: int = 45) -> dict[str, Any]: """ - Run a strategy against synthetic OHLC data in-process via a thread. + Run a strategy against synthetic OHLC data via strategy_harness.py subprocess. Returns: dict with success, output (human-readable summary), trade_count, error @@ -850,24 +863,26 @@ class CategoryFileManager: if not meta_path.exists(): return {"success": False, "error": "metadata.json not found"} - from dexorder.tools.strategy_harness import run as _strategy_run - result = _run_inprocess(_strategy_run, impl_path, meta_path, timeout=timeout) - - if result.get("_timeout"): + data = await run_subprocess_argv( + sys.executable, str(_STRATEGY_HARNESS), str(impl_path), str(meta_path), + timeout=timeout, + ) + if data.get("_timeout"): return {"success": False, "error": f"Strategy test timed out after {timeout}s"} - return result + if data.get("error") and not data.get("success"): + return {"success": False, "error": data.get("stderr") or "Harness failed"} + return data - def _validate_indicator(self, impl_path: Path) -> dict[str, Any]: + async def _validate_indicator(self, impl_path: Path) -> dict[str, Any]: """ Validate an indicator by running it against synthetic OHLC data. - Runs indicator_harness.py in-process via a thread. Catches import errors, - runtime errors, and wrong return types — not just syntax. + Runs indicator_harness.py in-process via a thread (main proc). Catches + import errors, runtime errors, and wrong return types — not just syntax. """ - meta_path = impl_path.parent / "metadata.json" - return self._execute_indicator(impl_path.parent, timeout=30) + return await self._execute_indicator(impl_path.parent, timeout=30) - def _execute_indicator(self, item_dir: Path, timeout: int = 30) -> dict[str, Any]: + async def _execute_indicator(self, item_dir: Path, timeout: int = 30) -> dict[str, Any]: """ Run an indicator against synthetic OHLC data in-process via a thread. @@ -883,29 +898,32 @@ class CategoryFileManager: return {"success": False, "error": "metadata.json not found"} from dexorder.tools.indicator_harness import run as _indicator_run - result = _run_inprocess(_indicator_run, impl_path, meta_path, timeout=timeout) + result = await run_in_thread(_indicator_run, impl_path, meta_path, timeout=timeout) if result.get("_timeout"): return {"success": False, "error": f"Indicator test timed out after {timeout}s"} return result - def _run_research_harness(self, impl_path: Path, item_dir: Path, timeout: int = 300) -> dict[str, Any]: + async def _run_research_harness(self, impl_path: Path, item_dir: Path, timeout: int = 300) -> dict[str, Any]: """ - Run a research script in-process via a thread and return captured results. + Run a research script via research_harness.py subprocess and return captured results. Returns: - dict with stdout, stderr, images, error fields — or an error dict. + dict with stdout, stderr, images, error fields. """ - from dexorder.tools.research_harness import run as _research_run - return _run_inprocess(_research_run, impl_path, item_dir, timeout=timeout) + return await run_subprocess_argv( + sys.executable, str(_RESEARCH_HARNESS), str(impl_path), + timeout=timeout, + cwd=item_dir, + ) - def _validate_research(self, impl_path: Path, item_dir: Path) -> dict[str, Any]: + async def _validate_research(self, impl_path: Path, item_dir: Path) -> dict[str, Any]: """ Validate a research script. Runs the script via the harness and captures output + pyplot images. """ - data = self._run_research_harness(impl_path, item_dir, timeout=300) + data = await self._run_research_harness(impl_path, item_dir, timeout=300) if data.get("_timeout"): return {"success": False, "error": "Research script timeout"} @@ -923,7 +941,7 @@ class CategoryFileManager: "images": data["images"], } - def execute_research(self, name: str) -> dict[str, Any]: + async def execute_research(self, name: str) -> dict[str, Any]: """ Execute a research script and return structured content with images. @@ -944,7 +962,7 @@ class CategoryFileManager: if not impl_path.exists(): return {"error": f"Implementation file not found for '{name}'"} - data = self._run_research_harness(impl_path, item_dir, timeout=300) + data = await self._run_research_harness(impl_path, item_dir, timeout=300) if data.get("_timeout"): log.error(f"execute_research '{name}': timeout") @@ -995,7 +1013,7 @@ class CategoryFileManager: return {"content": content} - def delete(self, category: str, name: str) -> dict[str, Any]: + async def delete(self, category: str, name: str) -> dict[str, Any]: """ Delete a category script directory and commit the removal to git. @@ -1031,13 +1049,13 @@ class CategoryFileManager: except Exception as e: return {"success": False, "error": f"Failed to delete: {e}"} - commit_hash = self.git.commit(f"delete({category}): {name}") + commit_hash = await self.git.commit_async(f"delete({category}): {name}") result: dict[str, Any] = {"success": True, "category": category, "name": name} if commit_hash: result["revision"] = commit_hash return result - def git_log( + async def git_log( self, category: Optional[str] = None, name: Optional[str] = None, @@ -1061,10 +1079,10 @@ class CategoryFileManager: path = get_category_path(self.src_dir, cat, name) else: path = self.src_dir / cat.value - entries = self.git.log(path=path, n=limit) + entries = await self.git.log_async(path=path, n=limit) return {"success": True, "commits": entries} - def git_revert(self, revision: str, category: str, name: str) -> dict[str, Any]: + async def git_revert(self, revision: str, category: str, name: str) -> dict[str, Any]: """ Restore a category item to a previous git revision (creates a new commit). @@ -1085,11 +1103,11 @@ class CategoryFileManager: return {"success": False, "error": f"Item '{name}' not found in '{category}'"} try: - commit_hash = self.git.restore(revision, path=item_dir) + commit_hash = await self.git.restore_async(revision, path=item_dir) except RuntimeError as e: return {"success": False, "error": str(e)} - validation = self._validate(cat, item_dir) + validation = await self._validate(cat, item_dir) return { "success": validation["success"], "revision": commit_hash, diff --git a/sandbox/dexorder/tools/research_harness.py b/sandbox/dexorder/tools/research_harness.py index 43545a02..731517d5 100644 --- a/sandbox/dexorder/tools/research_harness.py +++ b/sandbox/dexorder/tools/research_harness.py @@ -119,11 +119,39 @@ def run(impl_path: Path, item_dir: Path) -> dict: stdout_buf = io.StringIO() stderr_buf = io.StringIO() + # Eagerly capture figures when user scripts call plt.close() so images are + # not lost even if the script closes figures immediately after savefig(). + captured_images: list[dict] = [] + + def _capture_fig(fig) -> None: + buf = io.BytesIO() + fig.savefig(buf, format='png', dpi=100, bbox_inches='tight') + buf.seek(0) + captured_images.append({"format": "png", "data": base64.b64encode(buf.read()).decode('utf-8')}) + buf.close() + + _orig_plt_close = plt.close + + def _patched_close(fig=None): + if fig is None: + for fn in plt.get_fignums(): + _capture_fig(plt.figure(fn)) + elif fig == 'all': + for fn in plt.get_fignums(): + _capture_fig(plt.figure(fn)) + else: + try: + _capture_fig(fig if hasattr(fig, 'savefig') else plt.figure(fig)) + except Exception: + pass + _orig_plt_close(fig) + error_occurred = False old_stdout, old_stderr = sys.stdout, sys.stderr old_cwd = os.getcwd() sys.stdout = stdout_buf sys.stderr = stderr_buf + plt.close = _patched_close try: os.chdir(impl_path.parent) @@ -136,22 +164,26 @@ def run(impl_path: Path, item_dir: Path) -> dict: sys.stdout = old_stdout sys.stderr = old_stderr os.chdir(old_cwd) + plt.close = _orig_plt_close stdout_output = stdout_buf.getvalue() stderr_output = stderr_buf.getvalue() # --------------------------------------------------------------------------- - # Capture matplotlib figures + # Capture any figures still open after script completion # --------------------------------------------------------------------------- - images = [] + images = captured_images if not error_occurred: + already_seen = {img["data"] for img in images} for fig_num in plt.get_fignums(): fig = plt.figure(fig_num) buf = io.BytesIO() fig.savefig(buf, format='png', dpi=100, bbox_inches='tight') buf.seek(0) - images.append({"format": "png", "data": base64.b64encode(buf.read()).decode('utf-8')}) + data = base64.b64encode(buf.read()).decode('utf-8') buf.close() + if data not in already_seen: + images.append({"format": "png", "data": data}) plt.close('all') return { diff --git a/sandbox/dexorder/tools/subprocess_runner.py b/sandbox/dexorder/tools/subprocess_runner.py new file mode 100644 index 00000000..78b4ca48 --- /dev/null +++ b/sandbox/dexorder/tools/subprocess_runner.py @@ -0,0 +1,182 @@ +""" +subprocess_runner — non-blocking subprocess primitives for the MCP sandbox. + +All three entrypoints return the same dict shape as the legacy _run_inprocess(): + { + "error": bool, + "stdout": str, + "stderr": str, + "images": list, # always [] for non-research invocations + "_timeout": bool # present and True only on timeout + } + +Callers can therefore pattern-match on {"_timeout", "error", "stdout", "stderr"} +uniformly regardless of whether the work ran in a subprocess or a thread. +""" + +import asyncio +import json +import traceback +from pathlib import Path +from typing import Any, Callable + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _normalise(data: dict, stderr_fallback: str = "") -> dict: + """Ensure the standard shape keys are present in a harness result dict.""" + data.setdefault("error", False) + data.setdefault("stdout", "") + data.setdefault("stderr", stderr_fallback) + data.setdefault("images", []) + return data + + +def _err_dict(stderr: str = "", stdout: str = "") -> dict: + return {"error": True, "stdout": stdout, "stderr": stderr, "images": []} + + +def _timeout_dict() -> dict: + return {"_timeout": True, "error": True, "stdout": "", "stderr": "", "images": []} + + +# --------------------------------------------------------------------------- +# Primitive 1: run_subprocess_argv +# +# Non-blocking equivalent of: +# subprocess.run([sys.executable, harness, arg1, arg2, ...], +# capture_output=True, text=True, timeout=N, cwd=cwd) +# +# Used by: _execute_strategy, _run_research_harness +# --------------------------------------------------------------------------- + +async def run_subprocess_argv( + *cmd: str, + timeout: int, + cwd: Path | None = None, +) -> dict: + """ + Spawn cmd as a subprocess, await completion, and return a normalised result dict. + + stdout is expected to contain a JSON object written by the harness. It is + decoded and normalised to the standard shape. On JSON decode failure the + raw stdout text is preserved in "stdout" and error is set to True. + """ + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=str(cwd) if cwd else None, + ) + stdout_bytes, stderr_bytes = await asyncio.wait_for( + proc.communicate(), timeout=timeout + ) + except asyncio.TimeoutError: + return _timeout_dict() + except Exception as exc: + return _err_dict(stderr=f"Harness launch failed: {exc}") + + stdout_text = stdout_bytes.decode(errors="replace") + stderr_text = stderr_bytes.decode(errors="replace") + + if proc.returncode != 0: + return _err_dict( + stderr=f"Harness exited {proc.returncode}:\n{stderr_text}", + stdout=stdout_text, + ) + + try: + data = json.loads(stdout_text) + return _normalise(data, stderr_fallback=stderr_text) + except json.JSONDecodeError: + return {"error": True, "stdout": stdout_text, "stderr": stderr_text, "images": []} + + +# --------------------------------------------------------------------------- +# Primitive 2: run_subprocess_stdin +# +# Non-blocking equivalent of the backtest pattern — JSON config fed via stdin. +# --------------------------------------------------------------------------- + +async def run_subprocess_stdin( + *cmd: str, + stdin_data: bytes, + timeout: int, +) -> dict: + """ + Spawn cmd, write stdin_data to its stdin, await completion. + + Returns the same normalised dict shape as run_subprocess_argv. + """ + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout_bytes, stderr_bytes = await asyncio.wait_for( + proc.communicate(stdin_data), timeout=timeout + ) + except asyncio.TimeoutError: + return _timeout_dict() + except Exception as exc: + return _err_dict(stderr=f"Harness launch failed: {exc}") + + stdout_text = stdout_bytes.decode(errors="replace") + stderr_text = stderr_bytes.decode(errors="replace") + + if proc.returncode != 0: + return _err_dict( + stderr=f"Harness exited {proc.returncode}:\n{stderr_text}", + stdout=stdout_text, + ) + + try: + data = json.loads(stdout_text) + return _normalise(data, stderr_fallback=stderr_text) + except json.JSONDecodeError: + return {"error": True, "stdout": stdout_text, "stderr": stderr_text, "images": []} + + +# --------------------------------------------------------------------------- +# Primitive 3: run_in_thread +# +# Async wrapper around asyncio.to_thread so the event loop stays responsive +# while CPU-bound or blocking-IO callables run in a worker thread. +# +# Used by: _execute_indicator (in-process indicator harness) +# --------------------------------------------------------------------------- + +async def run_in_thread( + fn: Callable, + *args: Any, + timeout: int, +) -> dict: + """ + Run fn(*args) in a thread pool worker and yield to the event loop while waiting. + + On timeout the thread is abandoned (daemon) and _timeout_dict() is returned. + On MemoryError or unexpected exception a standard error dict is returned. + The returned dict is normalised to the standard shape. + """ + from dexorder.memory_guard import cleanup_memory + + try: + result = await asyncio.wait_for( + asyncio.to_thread(fn, *args), + timeout=timeout, + ) + return _normalise(result) + except asyncio.TimeoutError: + return _timeout_dict() + except MemoryError: + cleanup_memory() + return _err_dict( + stderr="Script exceeded memory limit. Try reducing the data range or batch size." + ) + except Exception: + return _err_dict(stderr=traceback.format_exc()) diff --git a/sandbox/main.py b/sandbox/main.py index f8f7e1d9..de484864 100644 --- a/sandbox/main.py +++ b/sandbox/main.py @@ -33,7 +33,7 @@ from starlette.routing import Route, Mount from dexorder import EventPublisher, start_lifecycle_manager, get_lifecycle_manager from dexorder.api import set_api, API -from dexorder.conda_manager import sync_packages, install_packages, cleanup_extra_packages +from dexorder.conda_manager import sync_packages_async, install_packages_async, cleanup_extra_packages_async from dexorder.events import EventType, UserEvent, DeliverySpec from dexorder.impl.charting_api_impl import ChartingAPIImpl from dexorder.impl.data_api_impl import DataAPIImpl @@ -893,7 +893,7 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server arguments.get("patch", []) ) elif name == "python_write": - result = category_manager.write( + result = await category_manager.write( category=arguments.get("category", ""), name=arguments.get("name", ""), description=arguments.get("description", ""), @@ -920,10 +920,10 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server logging.info(f"python_write '{arguments.get('name')}': no execution result (category={arguments.get('category')})") if result.get("success"): _upsert_type(workspace_store, category_manager, arguments.get("category", ""), arguments.get("name", "")) - cleanup_extra_packages(get_data_dir(), _get_env_yml()) + await cleanup_extra_packages_async(get_data_dir(), _get_env_yml()) return content elif name == "python_edit": - result = category_manager.edit( + result = await category_manager.edit( category=arguments.get("category", ""), name=arguments.get("name", ""), code=arguments.get("code"), @@ -951,7 +951,7 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server logging.info(f"python_edit '{arguments.get('name')}': no execution result") if result.get("success"): _upsert_type(workspace_store, category_manager, arguments.get("category", ""), arguments.get("name", "")) - cleanup_extra_packages(get_data_dir(), _get_env_yml()) + await cleanup_extra_packages_async(get_data_dir(), _get_env_yml()) return content elif name == "python_read": return category_manager.read( @@ -963,7 +963,7 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server category=arguments.get("category", "") ) elif name == "python_log": - result = category_manager.git_log( + result = await category_manager.git_log( category=arguments.get("category"), name=arguments.get("name"), limit=int(arguments.get("limit", 20)) @@ -973,7 +973,7 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server lines.append(f"{c['short_hash']} {c['date'][:10]} {c['message']}") return [TextContent(type="text", text="\n".join(lines))] elif name == "python_revert": - result = category_manager.git_revert( + result = await category_manager.git_revert( revision=arguments.get("revision", ""), category=arguments.get("category", ""), name=arguments.get("name", "") @@ -989,13 +989,13 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server _upsert_type(workspace_store, category_manager, arguments.get("category", ""), arguments.get("name", "")) return [TextContent(type="text", text="\n".join(meta_parts))] elif name == "python_delete": - result = category_manager.delete( + result = await category_manager.delete( category=arguments.get("category", ""), name=arguments.get("name", "") ) if result.get("success"): _remove_type(workspace_store, arguments.get("category", ""), arguments.get("name", "")) - cleanup_result = cleanup_extra_packages(get_data_dir(), _get_env_yml()) + cleanup_result = await cleanup_extra_packages_async(get_data_dir(), _get_env_yml()) if cleanup_result.get("removed"): result["packages_removed"] = cleanup_result["removed"] parts = [f"success: {result['success']}"] @@ -1004,14 +1004,14 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server parts.append(f"{k}: {result[k]}") return [TextContent(type="text", text="\n".join(parts))] elif name == "conda_sync": - return sync_packages( + return await sync_packages_async( data_dir=get_data_dir(), environment_yml=_get_env_yml() ) elif name == "conda_install": - return install_packages(arguments.get("packages", [])) + return await install_packages_async(arguments.get("packages", [])) elif name == "execute_research": - result = category_manager.execute_research(name=arguments.get("name", "")) + result = await category_manager.execute_research(name=arguments.get("name", "")) if "error" in result: logging.error(f"execute_research '{arguments.get('name')}': {result['error']}") return [TextContent(type="text", text=f"Error: {result['error']}")] @@ -1113,6 +1113,8 @@ def create_streamable_http_app(mcp_server: Server) -> Starlette: @contextlib.asynccontextmanager async def lifespan(app: Starlette): + from dexorder.event_loop import install_thread_safe_asyncio_run + install_thread_safe_asyncio_run(asyncio.get_running_loop()) async with session_manager.run(): yield @@ -1156,6 +1158,14 @@ class UserContainer: # Load configuration self.config.load() + # Python-level memory guard (RLIMIT_AS soft limit) — DISABLED. + # We assume nodes have ample memory (8Gi limits) and will revisit a + # proper RSS-based cgroup monitor later. The implementation is in + # dexorder/memory_guard.py if we want to re-enable. + # from dexorder.memory_guard import setup_memory_limit + # mem_cfg = self.config.config_data.get("memory", {}) + # setup_memory_limit(fraction=float(mem_cfg.get("limit_fraction", 0.85))) + # Initialize data and charting API data_cfg = self.config.config_data.get("data", {}) iceberg_cfg = data_cfg.get("iceberg", {}) diff --git a/web/src/App.vue b/web/src/App.vue index f6082c41..622f67d7 100644 --- a/web/src/App.vue +++ b/web/src/App.vue @@ -9,6 +9,8 @@ import { useShapeStore } from './stores/shapes' import { useIndicatorStore } from './stores/indicators' import { useIndicatorTypesStore } from './stores/indicatorTypes' import { useChannelStore } from './stores/channel' +import { useResearchTypesStore } from './stores/researchTypes' +import { useStrategyTypesStore } from './stores/strategyTypes' import { useStateSync } from './composables/useStateSync' import { wsManager } from './composables/useWebSocket' import { authService } from './composables/useAuth' @@ -44,9 +46,18 @@ function onHDragMove(e: PointerEvent) { chartWidth.value = Math.max(CHART_MIN_PX, Math.min(maxWidth, hDragStartWidth + delta)) } +// Clamp chartWidth so chart + chat always fit within the window +function clampChartWidth() { + const maxWidth = window.innerWidth - CHAT_MIN_PX - 4 + if (maxWidth >= CHART_MIN_PX) { + chartWidth.value = Math.max(CHART_MIN_PX, Math.min(maxWidth, chartWidth.value)) + } +} + // Check screen width for mobile layout const checkMobile = () => { isMobile.value = window.innerWidth < 768 + if (!isMobile.value) clampChartWidth() } const chartStore = useChartStore() @@ -108,11 +119,15 @@ const initializeApp = async () => { const indicatorStore = useIndicatorStore() const indicatorTypesStore = useIndicatorTypesStore() const channelStore = useChannelStore() + const researchTypesStore = useResearchTypesStore() + const strategyTypesStore = useStrategyTypesStore() const stateSync = useStateSync({ chartState: chartStore, shapes: shapeStore, indicators: indicatorStore, indicator_types: indicatorTypesStore, + research_types: researchTypesStore, + strategy_types: strategyTypesStore, channelState: channelStore }) stateSyncCleanup = stateSync.cleanup @@ -195,7 +210,7 @@ onBeforeUnmount(() => { .chat-panel { flex: 1; - min-width: 0; + min-width: 240px; height: 100%; overflow: hidden; display: flex; diff --git a/web/src/components/BottomTray.vue b/web/src/components/BottomTray.vue index ccf874a0..cd9003be 100644 --- a/web/src/components/BottomTray.vue +++ b/web/src/components/BottomTray.vue @@ -7,6 +7,7 @@ import TabPanels from 'primevue/tabpanels' import TabPanel from 'primevue/tabpanel' import OrdersTab from './tabs/OrdersTab.vue' import PlaceholderTab from './tabs/PlaceholderTab.vue' +import ResearchTab from './tabs/ResearchTab.vue' interface TempTab { id: string @@ -81,9 +82,10 @@ defineExpose({ @@ -682,8 +687,13 @@ onUnmounted(() => { } .workspace-loading-spinner { - font-size: 2rem; - color: #089981; + width: 2rem; + height: 2rem; + animation: workspace-spin 0.8s linear infinite; +} + +@keyframes workspace-spin { + to { transform: rotate(360deg); } } .workspace-loading-message { @@ -721,24 +731,4 @@ onUnmounted(() => { color: var(--p-surface-900); } -.stop-button-container { - position: absolute; - bottom: 80px; - right: 20px; - z-index: 1000; -} - -.stop-button { - box-shadow: 0 4px 12px rgba(0, 0, 0, 0.15); - animation: pulse 2s infinite; -} - -@keyframes pulse { - 0%, 100% { - opacity: 1; - } - 50% { - opacity: 0.8; - } -} diff --git a/web/src/components/tabs/ResearchTab.vue b/web/src/components/tabs/ResearchTab.vue new file mode 100644 index 00000000..3d7890ed --- /dev/null +++ b/web/src/components/tabs/ResearchTab.vue @@ -0,0 +1,108 @@ + + + + + diff --git a/web/src/composables/useStateSync.ts b/web/src/composables/useStateSync.ts index 3bd8fef0..ea34cc6f 100644 --- a/web/src/composables/useStateSync.ts +++ b/web/src/composables/useStateSync.ts @@ -60,8 +60,6 @@ export function useStateSync(stores: Record) { currentSeqs[msg.store] = msg.seq; saveStoredSeqs(currentSeqs); console.log('[StateSync] Snapshot applied, new seq:', msg.seq); - } else { - console.warn('[StateSync] Store not found:', msg.store); } } else if (msg.type === 'patch') { console.log('[StateSync] Processing patch for store:', msg.store, 'seq:', msg.seq); @@ -89,8 +87,6 @@ export function useStateSync(stores: Record) { currentSeqs[msg.store] = msg.seq; saveStoredSeqs(currentSeqs); console.log('[StateSync] Patch applied successfully, new seq:', msg.seq); - } else { - console.warn('[StateSync] Store not found:', msg.store); } } }; diff --git a/web/src/composables/useTradingViewDatafeed.ts b/web/src/composables/useTradingViewDatafeed.ts index bef50e55..41c0c171 100644 --- a/web/src/composables/useTradingViewDatafeed.ts +++ b/web/src/composables/useTradingViewDatafeed.ts @@ -263,7 +263,10 @@ export class WebSocketDatafeed implements IBasicDataFeed { throw err }) .then((response) => { - if (response.history) { + if (response.error) { + console.error('[TradingView Datafeed] getBars server error:', response.error) + onError(response.error) + } else if (response.history) { console.log('[TradingView Datafeed] Raw bar sample:', response.history.bars?.[0]) console.log('[TradingView Datafeed] Denominators:', denoms) @@ -309,7 +312,7 @@ export class WebSocketDatafeed implements IBasicDataFeed { this.sendRequest({ type: 'subscribe_bars', symbol: symbolInfo.ticker || symbolInfo.name, - resolution: resolution, + period_seconds: intervalToSeconds(resolution), subscription_id: listenerGuid }) .then((response) => { @@ -328,8 +331,10 @@ export class WebSocketDatafeed implements IBasicDataFeed { } unsubscribeBars(listenerGuid: string): void { + const sub = this.subscriptions.get(listenerGuid) this.sendRequest({ type: 'unsubscribe_bars', + period_seconds: sub ? intervalToSeconds(sub.resolution) : 60, subscription_id: listenerGuid }) .then(() => { diff --git a/web/src/composables/useWebSocket.ts b/web/src/composables/useWebSocket.ts index ad9f207f..093ff94d 100644 --- a/web/src/composables/useWebSocket.ts +++ b/web/src/composables/useWebSocket.ts @@ -30,8 +30,14 @@ class WebSocketManager { async connect(token: string): Promise { this.token = token - // Close existing connection if any + // Close existing connection if any — null out handlers first so the async + // onclose event from the old socket cannot reset sessionStatus after the + // new socket has already reached 'ready'. if (this.ws) { + this.ws.onopen = null + this.ws.onmessage = null + this.ws.onerror = null + this.ws.onclose = null this.ws.close() this.ws = null } diff --git a/web/src/stores/researchTypes.ts b/web/src/stores/researchTypes.ts new file mode 100644 index 00000000..d9a95f07 --- /dev/null +++ b/web/src/stores/researchTypes.ts @@ -0,0 +1,14 @@ +import { defineStore } from 'pinia' +import { ref } from 'vue' + +export interface ResearchType { + display_name: string + description?: string + created_at: number + modified_at: number +} + +export const useResearchTypesStore = defineStore('research_types', () => { + const types = ref>({}) + return { types } +}) diff --git a/web/src/stores/strategyTypes.ts b/web/src/stores/strategyTypes.ts new file mode 100644 index 00000000..fc2f65e8 --- /dev/null +++ b/web/src/stores/strategyTypes.ts @@ -0,0 +1,14 @@ +import { defineStore } from 'pinia' +import { ref } from 'vue' + +export interface StrategyType { + display_name: string + description?: string + created_at: number + modified_at: number +} + +export const useStrategyTypesStore = defineStore('strategy_types', () => { + const types = ref>({}) + return { types } +})