diff --git a/.aiignore b/.aiignore new file mode 100644 index 00000000..bcd2e76d --- /dev/null +++ b/.aiignore @@ -0,0 +1,6 @@ +ingestor/protobuf +flink/protobuf +relay/protobuf +gateway/protobuf +deploy/k8s/dev/configs/gateway-config.yaml +deploy/k8s/prod/configs/gateway-config.yaml \ No newline at end of file diff --git a/.idea/ai.iml b/.idea/ai.iml index f0cd7df8..7a1f77b7 100644 --- a/.idea/ai.iml +++ b/.idea/ai.iml @@ -20,6 +20,11 @@ + + + + + diff --git a/bin/dev b/bin/dev index 454f316e..bb9e2712 100755 --- a/bin/dev +++ b/bin/dev @@ -20,10 +20,10 @@ usage() { echo " start Start minikube and deploy all services" echo " stop [--keep-data] Stop minikube (deletes PVCs by default)" echo " restart [svc] Rebuild and redeploy all services, or just one (relay|ingestor|flink|gateway|sidecar|web|sandbox)" - echo " deep-restart [svc] Restart StatefulSet(s) and delete their PVCs (kafka|postgres|minio|qdrant|all)" + echo " deep-restart [svc] Restart StatefulSet(s) and delete their PVCs (kafka|postgres|minio|all)" echo " rebuild [svc] Rebuild all custom images, or just one" echo " deploy [svc] Deploy/update all services, or just one" - echo " delete-pvcs [svc] Delete PVCs for specific service or all (kafka|postgres|minio|qdrant|all)" + echo " delete-pvcs [svc] Delete PVCs for specific service or all (kafka|postgres|minio|all)" echo " status Show status of all services" echo " logs Tail logs for a service" echo " shell Open a shell in a service pod" @@ -446,19 +446,15 @@ delete_pvcs() { minio) kubectl delete pvc -l app=minio || true ;; - qdrant) - kubectl delete pvc -l app=qdrant || true - ;; all) echo -e "${YELLOW}Deleting all StatefulSet PVCs...${NC}" kubectl delete pvc -l app=kafka 2>/dev/null || true kubectl delete pvc -l app=postgres 2>/dev/null || true kubectl delete pvc -l app=minio 2>/dev/null || true - kubectl delete pvc -l app=qdrant 2>/dev/null || true ;; *) echo -e "${RED}Error: Unknown service '$service'${NC}" - echo "Valid services: kafka, postgres, minio, qdrant, all" + echo "Valid services: kafka, postgres, minio, all" exit 1 ;; esac @@ -497,15 +493,9 @@ deep_restart() { echo -e "${GREEN}→${NC} Force restarting iceberg-catalog (depends on minio)..." kubectl delete pod -l app=iceberg-catalog 2>/dev/null || true ;; - qdrant) - echo -e "${GREEN}→${NC} Deleting qdrant StatefulSet..." - kubectl delete statefulset qdrant || true - sleep 2 - delete_pvcs qdrant - ;; all) echo -e "${GREEN}→${NC} Deleting all StatefulSets..." - kubectl delete statefulset kafka postgres minio qdrant || true + kubectl delete statefulset kafka postgres minio || true sleep 2 delete_pvcs all # Force restart iceberg-catalog since it depends on postgres and minio @@ -517,7 +507,7 @@ deep_restart() { ;; *) echo -e "${RED}Error: Unknown service '$service'${NC}" - echo "Valid services: kafka, postgres, minio, qdrant, all" + echo "Valid services: kafka, postgres, minio, all" exit 1 ;; esac @@ -642,13 +632,12 @@ case "$COMMAND" in echo -e "${BLUE}Stopping minikube and deleting PVCs...${NC}" # Scale down StatefulSets first to release PVCs echo -e "${GREEN}→${NC} Scaling down StatefulSets..." - kubectl scale statefulset kafka postgres minio qdrant --replicas=0 2>/dev/null || true + kubectl scale statefulset kafka postgres minio --replicas=0 2>/dev/null || true # Wait for pods to terminate echo -e "${GREEN}→${NC} Waiting for pods to terminate..." kubectl wait --for=delete pod -l app=kafka --timeout=60s 2>/dev/null || true kubectl wait --for=delete pod -l app=postgres --timeout=60s 2>/dev/null || true kubectl wait --for=delete pod -l app=minio --timeout=60s 2>/dev/null || true - kubectl wait --for=delete pod -l app=qdrant --timeout=60s 2>/dev/null || true # Now delete PVCs delete_pvcs all # Delete sandbox namespace diff --git a/deploy/k8s/base/gateway.yaml b/deploy/k8s/base/gateway.yaml index ace3adc7..ecc6d521 100644 --- a/deploy/k8s/base/gateway.yaml +++ b/deploy/k8s/base/gateway.yaml @@ -44,9 +44,6 @@ spec: - name: wait-for-dragonfly image: busybox:1.36 command: ['sh', '-c', 'until nc -z dragonfly 6379; do echo waiting for dragonfly; sleep 2; done;'] - - name: wait-for-qdrant - image: busybox:1.36 - command: ['sh', '-c', 'until nc -z qdrant 6333; do echo waiting for qdrant; sleep 2; done;'] - name: wait-for-iceberg-catalog image: busybox:1.36 command: ['sh', '-c', 'until nc -z iceberg-catalog 8181; do echo waiting for iceberg-catalog; sleep 2; done;'] diff --git a/deploy/k8s/dev/configs/gateway-config.yaml.tpl b/deploy/k8s/dev/configs/gateway-config.yaml.tpl index 65d62466..99cd46c6 100644 --- a/deploy/k8s/dev/configs/gateway-config.yaml.tpl +++ b/deploy/k8s/dev/configs/gateway-config.yaml.tpl @@ -27,29 +27,22 @@ data: model_provider: deepinfra model: zai-org/GLM-5 - # License tier model configuration + # License tier model configuration (null = fall back to defaults.model) license_models: - # Free tier models free: - default: zai-org/GLM-5 - cost_optimized: zai-org/GLM-5 - complex: zai-org/GLM-5 - allowed_models: - - zai-org/GLM-5 + default: ~ + cost_optimized: ~ + complex: ~ - # Pro tier models pro: - default: zai-org/GLM-5 - cost_optimized: zai-org/GLM-5 - complex: zai-org/GLM-5 - blocked_models: - - Qwen/Qwen3-235B-A22B-Instruct-2507 + default: ~ + cost_optimized: ~ + complex: ~ - # Enterprise tier models enterprise: - default: zai-org/GLM-5 - cost_optimized: zai-org/GLM-5 - complex: Qwen/Qwen3-235B-A22B-Instruct-2507 + default: ~ + cost_optimized: ~ + complex: ~ # Kubernetes configuration kubernetes: @@ -70,11 +63,6 @@ data: redis: url: redis://dragonfly:6379 - # Qdrant (for RAG vector search) - qdrant: - url: http://qdrant:6333 - collection: gateway_memory - # Iceberg (for durable storage via REST catalog) iceberg: catalog_uri: http://iceberg-catalog:8181 diff --git a/deploy/k8s/dev/infrastructure.yaml b/deploy/k8s/dev/infrastructure.yaml index 5814ca9d..7454c00b 100644 --- a/deploy/k8s/dev/infrastructure.yaml +++ b/deploy/k8s/dev/infrastructure.yaml @@ -45,68 +45,6 @@ spec: memory: "512Mi" cpu: "500m" --- -# Qdrant (Vector database for RAG) -apiVersion: v1 -kind: Service -metadata: - name: qdrant -spec: - selector: - app: qdrant - ports: - - name: http - protocol: TCP - port: 6333 - targetPort: 6333 - - name: grpc - protocol: TCP - port: 6334 - targetPort: 6334 - type: ClusterIP ---- -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: qdrant -spec: - serviceName: qdrant - replicas: 1 - selector: - matchLabels: - app: qdrant - template: - metadata: - labels: - app: qdrant - spec: - containers: - - name: qdrant - image: qdrant/qdrant:latest - ports: - - containerPort: 6333 - name: http - - containerPort: 6334 - name: grpc - resources: - requests: - memory: "512Mi" - cpu: "200m" - limits: - memory: "1Gi" - cpu: "1000m" - volumeMounts: - - name: qdrant-data - mountPath: /qdrant/storage - volumeClaimTemplates: - - metadata: - name: qdrant-data - spec: - accessModes: ["ReadWriteOnce"] - storageClassName: dev-ephemeral - resources: - requests: - storage: 10Gi ---- # Kafka (KRaft mode - no Zookeeper needed) # Using apache/kafka:3.9.0 instead of confluentinc/cp-kafka because: # - cp-kafka's entrypoint script has issues with KRaft configuration diff --git a/deploy/k8s/prod/configs/gateway-config.yaml b/deploy/k8s/prod/configs/gateway-config.yaml index 91619e66..6fedfd94 100644 --- a/deploy/k8s/prod/configs/gateway-config.yaml +++ b/deploy/k8s/prod/configs/gateway-config.yaml @@ -21,30 +21,6 @@ data: model_provider: deepinfra model: zai-org/GLM-5 - # License tier model configuration - license_models: - # Free tier models - free: - default: zai-org/GLM-5 - cost_optimized: zai-org/GLM-5 - complex: zai-org/GLM-5 - allowed_models: - - zai-org/GLM-5 - - # Pro tier models - pro: - default: zai-org/GLM-5 - cost_optimized: zai-org/GLM-5 - complex: zai-org/GLM-5 - blocked_models: - - Qwen/Qwen3-235B-A22B-Instruct-2507 - - # Enterprise tier models - enterprise: - default: zai-org/GLM-5 - cost_optimized: zai-org/GLM-5 - complex: Qwen/Qwen3-235B-A22B-Instruct-2507 - # Kubernetes configuration kubernetes: namespace: sandbox @@ -59,11 +35,6 @@ data: redis: url: redis://dragonfly:6379 - # Qdrant (for RAG vector search) - qdrant: - url: http://qdrant:6333 - collection: gateway_memory - # Agent configuration agent: # Number of prior conversation turns loaded as LLM context and flushed to Iceberg at session end diff --git a/deploy/k8s/prod/infrastructure.yaml b/deploy/k8s/prod/infrastructure.yaml index 9e8887fe..6bcc2fad 100644 --- a/deploy/k8s/prod/infrastructure.yaml +++ b/deploy/k8s/prod/infrastructure.yaml @@ -45,67 +45,6 @@ spec: memory: "512Mi" cpu: "500m" --- -# Qdrant (Vector database for RAG) -apiVersion: v1 -kind: Service -metadata: - name: qdrant -spec: - selector: - app: qdrant - ports: - - name: http - protocol: TCP - port: 6333 - targetPort: 6333 - - name: grpc - protocol: TCP - port: 6334 - targetPort: 6334 - type: ClusterIP ---- -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: qdrant -spec: - serviceName: qdrant - replicas: 1 - selector: - matchLabels: - app: qdrant - template: - metadata: - labels: - app: qdrant - spec: - containers: - - name: qdrant - image: qdrant/qdrant:latest - ports: - - containerPort: 6333 - name: http - - containerPort: 6334 - name: grpc - resources: - requests: - memory: "512Mi" - cpu: "200m" - limits: - memory: "1Gi" - cpu: "1000m" - volumeMounts: - - name: qdrant-data - mountPath: /qdrant/storage - volumeClaimTemplates: - - metadata: - name: qdrant-data - spec: - accessModes: ["ReadWriteOnce"] - resources: - requests: - storage: 10Gi ---- # Kafka (KRaft mode - no Zookeeper needed) apiVersion: v1 kind: Service diff --git a/deploy/k8s/prod/kustomization.yaml b/deploy/k8s/prod/kustomization.yaml index c31152aa..2b8ab6c4 100644 --- a/deploy/k8s/prod/kustomization.yaml +++ b/deploy/k8s/prod/kustomization.yaml @@ -11,7 +11,7 @@ resources: - ../base # Add the 'ai' namespace (base only creates 'sandbox') - namespaces.yaml - # Prod infrastructure (postgres, minio, kafka, flink, relay, ingestor, qdrant, dragonfly, iceberg) + # Prod infrastructure (postgres, minio, kafka, flink, relay, ingestor, dragonfly, iceberg) - infrastructure.yaml # Sandbox namespace resources (go to sandbox namespace, not ai) - sandbox-config.yaml diff --git a/deploy/k8s/prod/secrets/gateway-secrets.tpl.yaml b/deploy/k8s/prod/secrets/gateway-secrets.tpl.yaml index 850f5abf..9b0aba77 100644 --- a/deploy/k8s/prod/secrets/gateway-secrets.tpl.yaml +++ b/deploy/k8s/prod/secrets/gateway-secrets.tpl.yaml @@ -19,6 +19,7 @@ stringData: # LLM Provider API Keys llm_providers: deepinfra_api_key: "{{ op://AI Prod/Gateway/deepinfra_api_key }}" + anthropic_api_key: "{{ op://AI Prod/Gateway/anthropic_api_key }}" # Search API Keys search: @@ -36,10 +37,6 @@ stringData: push: service_key: "" - # Qdrant API key (optional, for hosted Qdrant) - qdrant: - api_key: "" - # Iceberg S3 credentials (must match minio-secret) iceberg: s3_access_key: "{{ op://AI Prod/MinIO/access_key }}" diff --git a/doc/CLUSTER_SETUP.md b/doc/CLUSTER_SETUP.md index e096c831..456d4342 100644 --- a/doc/CLUSTER_SETUP.md +++ b/doc/CLUSTER_SETUP.md @@ -10,7 +10,7 @@ The platform runs across two namespaces: | Namespace | Contents | |-----------|----------| -| `ai` | Gateway, web UI, all infrastructure services (postgres, minio, kafka, flink, relay, ingestor, qdrant, dragonfly, iceberg-catalog) | +| `ai` | Gateway, web UI, all infrastructure services (postgres, minio, kafka, flink, relay, ingestor, dragonfly, iceberg-catalog) | | `sandbox` | Per-user sandbox containers (created dynamically by the gateway) | Secrets are managed via 1Password CLI (`op inject`). All `.tpl.yaml` files in `deploy/k8s/prod/secrets/` contain `op://` references and are safe to commit; actual values are never stored in git. @@ -217,7 +217,7 @@ kubectl --context=prod -n ai get configmaps ## Step 7 — Deploy Infrastructure -Infrastructure services (postgres, minio, kafka, iceberg-catalog, dragonfly, qdrant, relay, ingestor, flink) are defined in `deploy/k8s/prod/infrastructure.yaml` and were applied in Step 4. +Infrastructure services (postgres, minio, kafka, iceberg-catalog, dragonfly, relay, ingestor, flink) are defined in `deploy/k8s/prod/infrastructure.yaml` and were applied in Step 4. Wait for the StatefulSets and Deployments to become ready: @@ -225,7 +225,6 @@ Wait for the StatefulSets and Deployments to become ready: kubectl --context=prod -n ai rollout status statefulset/postgres kubectl --context=prod -n ai rollout status statefulset/minio kubectl --context=prod -n ai rollout status statefulset/kafka -kubectl --context=prod -n ai rollout status statefulset/qdrant kubectl --context=prod -n ai rollout status deployment/dragonfly kubectl --context=prod -n ai rollout status deployment/iceberg-catalog kubectl --context=prod -n ai rollout status deployment/relay diff --git a/doc/agent_harness.md b/doc/agent_harness.md index 2d13730f..d7b9587e 100644 --- a/doc/agent_harness.md +++ b/doc/agent_harness.md @@ -22,20 +22,20 @@ The Agent Harness is the core orchestration layer for the Dexorder AI platform, │ ┌──────────────────┼──────────────────┐ │ │ │ │ │ │ │ ┌────▼─────┐ ┌────▼─────┐ ┌────▼─────┐ │ -│ │ MCP │ │ LLM │ │ RAG │ │ -│ │ Connector│ │ Router │ │ Retriever│ │ -│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ -│ │ │ │ │ -└─────────┼──────────────────┼──────────────────┼─────────────┘ - │ │ │ - ▼ ▼ ▼ - ┌────────────┐ ┌───────────┐ ┌───────────┐ - │ User's │ │ LLM │ │ Qdrant │ - │ MCP │ │ Providers │ │ (Vectors) │ - │ Container │ │(Anthropic,│ │ │ - │ (k8s pod) │ │ OpenAI, │ │ Global + │ - │ │ │ etc) │ │ User │ - └────────────┘ └───────────┘ └───────────┘ +│ │ MCP │ │ LLM │ │ +│ │ Connector│ │ Router │ │ +│ └────┬─────┘ └────┬─────┘ │ +│ │ │ │ +└─────────┼──────────────────┼─────────────┘ + │ │ + ▼ ▼ + ┌────────────┐ ┌───────────┐ + │ User's │ │ LLM │ + │ MCP │ │ Providers │ + │ Container │ │(Anthropic,│ + │ (k8s pod) │ │ OpenAI, │ + │ │ │ etc) │ + └────────────┘ └───────────┘ ``` ## Message Processing Flow @@ -57,17 +57,11 @@ When a user sends a message: │ - context://workspace-state │ - context://system-prompt │ - ├─→ b. RAGRetriever searches for relevant memories: - │ - Embeds user query - │ - Searches Qdrant: user_id = current_user OR user_id = "0" - │ - Returns user-specific + global platform knowledge - │ - ├─→ c. Build system prompt: + ├─→ b. Build system prompt: │ - Base platform prompt │ - User profile context │ - Workspace state │ - Custom user instructions - │ - Relevant RAG memories │ ├─→ d. ModelRouter selects LLM: │ - Based on license tier @@ -92,11 +86,10 @@ When a user sends a message: ### 1. Agent Harness (`gateway/src/harness/agent-harness.ts`) -**Stateless orchestrator** - all state lives in user's MCP container or RAG. +**Stateless orchestrator** - all state lives in user's MCP container. **Responsibilities:** - Fetch context from user's MCP resources -- Query RAG for relevant memories - Build prompts with full context - Route to appropriate LLM - Handle tool calls (platform vs user) @@ -141,40 +134,12 @@ Routes queries to appropriate LLM based on: - LangGraph checkpoints (1 hour TTL) - Fast reads for active conversations -**Qdrant** (Vector Search) -- Conversation embeddings -- User-specific memories (user_id = actual user ID) -- **Global platform knowledge** (user_id = "0") -- RAG retrieval with cosine similarity -- GDPR-compliant (indexed by user_id for fast deletion) - **Iceberg** (Cold Storage) - Full conversation history (partitioned by user_id, session_id) - Checkpoint snapshots for replay - Analytics and time-travel queries - GDPR-compliant with compaction -#### RAG System: - -**Global Knowledge** (user_id="0"): -- Platform capabilities and architecture -- Trading concepts and fundamentals -- Indicator development guides -- Strategy patterns and examples -- Loaded from `gateway/knowledge/` markdown files - -**User Knowledge** (user_id=specific user): -- Personal conversation history -- Trading preferences and style -- Custom indicators and strategies -- Workspace state and context - -**Query Flow:** -1. User query is embedded using EmbeddingService -2. Qdrant searches: `user_id IN (current_user, "0")` -3. Top-K relevant chunks returned -4. Added to LLM context automatically - ### 5. Skills vs Subagents #### Skills (`gateway/src/harness/skills/`) @@ -290,44 +255,6 @@ User's MCP container provides access to: - Tactical order generators (TWAP, iceberg, etc.) - Smart order routing -## Global Knowledge Management - -### Document Loading - -At gateway startup: -1. DocumentLoader scans `gateway/knowledge/` directory -2. Markdown files chunked by headers (~1000 tokens/chunk) -3. Embeddings generated via EmbeddingService -4. Stored in Qdrant with user_id="0" -5. Content hashing enables incremental updates - -### Directory Structure - -``` -gateway/knowledge/ - ├── platform/ # Platform capabilities - ├── trading/ # Trading fundamentals - ├── indicators/ # Indicator development - └── strategies/ # Strategy patterns -``` - -### Updating Knowledge - -**Development:** -```bash -curl -X POST http://localhost:3000/admin/reload-knowledge -``` - -**Production:** -- Update markdown files -- Deploy new version -- Auto-loaded on startup - -**Monitoring:** -```bash -curl http://localhost:3000/admin/knowledge-stats -``` - ## Container Lifecycle ### User Container Creation @@ -362,7 +289,6 @@ When user connects: ### ✅ Completed - Agent Harness with MCP integration - Model routing with license tiers -- RAG retriever with Qdrant - Document loader for global knowledge - EmbeddingService (Ollama/OpenAI) - Skills and subagents framework @@ -388,5 +314,4 @@ When user connects: - Documentation: `gateway/src/harness/README.md` - Knowledge base: `gateway/knowledge/` - LangGraph: https://langchain-ai.github.io/langgraphjs/ -- Qdrant: https://qdrant.tech/documentation/ - MCP Spec: https://modelcontextprotocol.io/ diff --git a/doc/architecture.md b/doc/architecture.md index 56d33032..1bca60d9 100644 --- a/doc/architecture.md +++ b/doc/architecture.md @@ -19,7 +19,6 @@ Dexorder is an AI-powered trading platform that combines real-time market data p │ • Authentication & session management │ │ • Agent Harness (LangChain/LangGraph orchestration) │ │ - MCP client connector to user containers │ -│ - RAG retriever (Qdrant) │ │ - Model router (LLM selection) │ │ - Skills & subagents framework │ │ • Dynamic user container provisioning │ @@ -30,8 +29,7 @@ Dexorder is an AI-powered trading platform that combines real-time market data p ┌──────────────────┐ ┌──────────────┐ ┌──────────────────────┐ │ User Containers │ │ Relay │ │ Infrastructure │ │ (per-user pods) │ │ (ZMQ Router) │ │ • DragonflyDB (cache)│ -│ │ │ │ │ • Qdrant (vectors) │ -│ • MCP Server │ │ • Market data│ │ • PostgreSQL (meta) │ +│ │ │ │ • MCP Server │ │ • Market data│ │ • PostgreSQL (meta) │ │ • User files: │ │ fanout │ │ • MinIO (S3) │ │ - Indicators │ │ • Work queue │ │ │ │ - Strategies │ │ • Stateless │ │ │ @@ -86,18 +84,16 @@ Dexorder is an AI-powered trading platform that combines real-time market data p - **Agent Harness (LangChain/LangGraph):** ([[agent_harness]]) - Stateless LLM orchestration - MCP client connector to user containers - - RAG retrieval from Qdrant (global + user-specific knowledge) - Model routing based on license tier and complexity - Skills and subagents framework - Workflow state machines with validation loops **Key Features:** -- **Stateless design:** All conversation state lives in user containers or Qdrant +- **Stateless design:** All conversation state lives in user containers - **Multi-channel support:** WebSocket, Telegram (future: mobile, Discord, Slack) - **Kubernetes-native:** Uses k8s API for container management - **Three-tier memory:** - Redis: Hot storage, active sessions, LangGraph checkpoints (1 hour TTL) - - Qdrant: Vector search, RAG, global + user knowledge, GDPR-compliant - Iceberg: Cold storage, full history, analytics, time-travel queries **Infrastructure:** @@ -270,12 +266,6 @@ Exchange API → Ingestor → Kafka → Flink → Iceberg - Redis-compatible in-memory cache - Session state, rate limiting, hot data -#### Qdrant -- Vector database for RAG -- **Global knowledge** (user_id="0"): Platform capabilities, trading concepts, strategy patterns -- **User knowledge** (user_id=specific): Personal conversations, preferences, strategies -- GDPR-compliant (indexed by user_id for fast deletion) - #### PostgreSQL - Iceberg catalog metadata - User accounts and license info (gateway) @@ -458,17 +448,11 @@ The gateway's agent harness (LangChain/LangGraph) orchestrates LLM interactions │ - context://workspace-state │ - context://system-prompt │ - ├─→ b. RAGRetriever searches Qdrant for relevant memories: - │ - Embeds user query - │ - Searches: user_id IN (current_user, "0") - │ - Returns user-specific + global platform knowledge - │ - ├─→ c. Build system prompt: + ├─→ b. Build system prompt: │ - Base platform prompt │ - User profile context │ - Workspace state │ - Custom user instructions - │ - Relevant RAG memories │ ├─→ d. ModelRouter selects LLM: │ - Based on license tier @@ -492,8 +476,6 @@ The gateway's agent harness (LangChain/LangGraph) orchestrates LLM interactions **Key Architecture:** - **Gateway is stateless:** No conversation history stored in gateway - **User context in MCP:** All user-specific data lives in user's container -- **Global knowledge in Qdrant:** Platform documentation loaded from `gateway/knowledge/` -- **RAG at gateway level:** Semantic search combines global + user knowledge - **Skills vs Subagents:** - Skills: Well-defined, single-purpose tasks - Subagents: Complex domain expertise with multi-file context @@ -630,7 +612,6 @@ See [[backend_redesign]] for detailed notes. - Historical backfill service **Phase 3: Agent Features** -- RAG integration (Qdrant) - Strategy backtesting - Risk management tools - Portfolio analytics diff --git a/doc/plan.md b/doc/plan.md index 3d253f89..47c22499 100644 --- a/doc/plan.md +++ b/doc/plan.md @@ -14,3 +14,13 @@ * TradingView indicator import tool * Results persistence: ~~research analysis~~, backtests, strategy performance metrics, etc. * Free tier with token limits and sandbox shutdown +* Performance analysis +* Custom pre-session scanners / summaries +* Saved prompts (Create /presession prompt command for easy re-use) + + +https://github.com/wangzhe3224/awesome-systematic-trading +https://papers.ssrn.com/sol3/papers.cfm?abstract_id=3247865 151 trading strategies +https://vectorbt.dev/ +https://github.com/shiyu-coder/Kronos +https://x.com/RohOnChain/status/2041180375838498950?s=20 combining signals diff --git a/flink/src/main/java/com/dexorder/flink/publisher/RealtimeBarFunction.java b/flink/src/main/java/com/dexorder/flink/publisher/RealtimeBarFunction.java index c5221a07..acd49d3c 100644 --- a/flink/src/main/java/com/dexorder/flink/publisher/RealtimeBarFunction.java +++ b/flink/src/main/java/com/dexorder/flink/publisher/RealtimeBarFunction.java @@ -25,7 +25,12 @@ import org.slf4j.LoggerFactory; * - Closed bar (isClosed=true): emitted once when a window boundary is crossed. * Topic: "{ticker}|ohlc:{period}" — consumed by strategies/triggers. * - * Accumulator layout (long[7]): + * Replay protection: ticks whose trade timestamp predates a period's current window start + * are discarded (prevents Kafka replay from contaminating current bars). Open bars are + * additionally suppressed until the first live tick (within LIVE_TICK_THRESHOLD_MS of now) + * is processed, so Kafka catch-up produces a single bar rather than a flood. + * + * Accumulator layout (long[8]): * [0] open * [1] high * [2] low @@ -33,13 +38,18 @@ import org.slf4j.LoggerFactory; * [4] volume (sum of base amount) * [5] windowStartMs (epoch ms) * [6] tickCount + * [7] valid (1 = seeded or fresh window, 0 = mid-window cold start — open bars suppressed) */ public class RealtimeBarFunction extends RichFlatMapFunction { private static final Logger LOG = LoggerFactory.getLogger(RealtimeBarFunction.class); private static final long serialVersionUID = 1L; + // Ticks within this many ms of wall-clock time are considered live (vs. Kafka catch-up). + private static final long LIVE_TICK_THRESHOLD_MS = 30_000L; private final int[] periods; private transient MapState accumState; + // Suppresses open bar emissions during Kafka catch-up; set to true on first live tick. + private transient boolean caughtUp = false; /** * @param periods Period lengths in seconds (e.g., 60, 300, 900, 3600) @@ -63,6 +73,20 @@ public class RealtimeBarFunction extends RichFlatMapFunction 0) { out.collect(toBar(tick.getTicker(), period, accum, true)); LOG.debug("Emitted closed bar: ticker={}, period={}s, windowStart={}, ticks={}", tick.getTicker(), period, accum[5], accum[6]); } - long[] newAccum = openWindow(tick, windowStart); + long[] newAccum = openWindow(tick, windowStart, true); accumState.put(period, newAccum); - out.collect(toBar(tick.getTicker(), period, newAccum, false)); + if (caughtUp) { + out.collect(toBar(tick.getTicker(), period, newAccum, false)); + } } else { - // Same window — update accumulator and emit current open bar + // Same window — update accumulator accum[1] = Math.max(accum[1], tick.getPrice()); // high accum[2] = Math.min(accum[2], tick.getPrice()); // low accum[3] = tick.getPrice(); // close accum[4] += tick.getAmount(); // volume accum[6]++; // tick count accumState.put(period, accum); - out.collect(toBar(tick.getTicker(), period, accum, false)); + if (accum[7] == 1 && caughtUp) { + out.collect(toBar(tick.getTicker(), period, accum, false)); + } else if (accum[7] == 0 && caughtUp) { + LOG.debug("Open bar suppressed (valid=0, no seed): ticker={}, period={}s", tick.getTicker(), period); + } } } } - private static long[] openWindow(TickWrapper tick, long windowStart) { + private static long[] openWindow(TickWrapper tick, long windowStart, boolean valid) { return new long[]{ - tick.getPrice(), // open - tick.getPrice(), // high - tick.getPrice(), // low - tick.getPrice(), // close + tick.getPrice(), // open + tick.getPrice(), // high + tick.getPrice(), // low + tick.getPrice(), // close tick.getAmount(), // volume windowStart, - 1L // tickCount + 1L, // tickCount + valid ? 1L : 0L // valid flag }; } diff --git a/flink/src/main/java/com/dexorder/flink/publisher/TickDeserializer.java b/flink/src/main/java/com/dexorder/flink/publisher/TickDeserializer.java index 6c45ce08..bd08b62c 100644 --- a/flink/src/main/java/com/dexorder/flink/publisher/TickDeserializer.java +++ b/flink/src/main/java/com/dexorder/flink/publisher/TickDeserializer.java @@ -40,7 +40,7 @@ public class TickDeserializer implements DeserializationSchema { Tick tick = Tick.parseFrom(payload); - return new TickWrapper( + TickWrapper tw = new TickWrapper( tick.getTicker(), tick.getTradeId(), tick.getTimestamp(), @@ -49,6 +49,15 @@ public class TickDeserializer implements DeserializationSchema { tick.getQuoteAmount(), tick.getTakerBuy() ); + if (tick.hasIsSeed() && tick.getIsSeed()) { + tw.setIsSeed(true); + tw.setSeedHigh(tick.getSeedHigh()); + tw.setSeedLow(tick.getSeedLow()); + tw.setSeedClose(tick.getSeedClose()); + tw.setSeedWindowStartMs(tick.getSeedWindowStartMs()); + tw.setSeedPeriodSeconds(tick.getSeedPeriodSeconds()); + } + return tw; } catch (Exception e) { LOG.warn("Failed to deserialize Tick, skipping: {}", e.getMessage()); diff --git a/flink/src/main/java/com/dexorder/flink/publisher/TickWrapper.java b/flink/src/main/java/com/dexorder/flink/publisher/TickWrapper.java index 7372124f..1920b98b 100644 --- a/flink/src/main/java/com/dexorder/flink/publisher/TickWrapper.java +++ b/flink/src/main/java/com/dexorder/flink/publisher/TickWrapper.java @@ -20,6 +20,12 @@ public class TickWrapper implements Serializable { /** Quote amount as scaled integer */ private long quoteAmount; private boolean takerBuy; + private boolean isSeed; + private long seedHigh; + private long seedLow; + private long seedClose; + private long seedWindowStartMs; + private int seedPeriodSeconds; public TickWrapper() {} @@ -41,6 +47,12 @@ public class TickWrapper implements Serializable { public long getAmount() { return amount; } public long getQuoteAmount() { return quoteAmount; } public boolean isTakerBuy() { return takerBuy; } + public boolean isSeed() { return isSeed; } + public long getSeedHigh() { return seedHigh; } + public long getSeedLow() { return seedLow; } + public long getSeedClose() { return seedClose; } + public long getSeedWindowStartMs() { return seedWindowStartMs; } + public int getSeedPeriodSeconds() { return seedPeriodSeconds; } public void setTicker(String ticker) { this.ticker = ticker; } public void setTradeId(String tradeId) { this.tradeId = tradeId; } @@ -49,6 +61,12 @@ public class TickWrapper implements Serializable { public void setAmount(long amount) { this.amount = amount; } public void setQuoteAmount(long quoteAmount) { this.quoteAmount = quoteAmount; } public void setTakerBuy(boolean takerBuy) { this.takerBuy = takerBuy; } + public void setIsSeed(boolean isSeed) { this.isSeed = isSeed; } + public void setSeedHigh(long seedHigh) { this.seedHigh = seedHigh; } + public void setSeedLow(long seedLow) { this.seedLow = seedLow; } + public void setSeedClose(long seedClose) { this.seedClose = seedClose; } + public void setSeedWindowStartMs(long seedWindowStartMs) { this.seedWindowStartMs = seedWindowStartMs; } + public void setSeedPeriodSeconds(int seedPeriodSeconds) { this.seedPeriodSeconds = seedPeriodSeconds; } @Override public String toString() { diff --git a/gateway/.env.example b/gateway/.env.example index 3cc66dae..de97144c 100644 --- a/gateway/.env.example +++ b/gateway/.env.example @@ -38,10 +38,6 @@ SANDBOX_STORAGE_CLASS=standard # Redis (for hot storage and session management) REDIS_URL=redis://localhost:6379 -# Qdrant (for RAG vector search) -QDRANT_URL=http://localhost:6333 -QDRANT_API_KEY= # optional, leave empty for local dev - # Iceberg (for durable storage via REST catalog) ICEBERG_CATALOG_URI=http://iceberg-catalog:8181 ICEBERG_NAMESPACE=gateway diff --git a/gateway/README.md b/gateway/README.md index 9a1d497d..005bed00 100644 --- a/gateway/README.md +++ b/gateway/README.md @@ -58,7 +58,6 @@ Multi-channel gateway with agent harness for the Dexorder AI platform. - **Streaming responses**: Real-time chat with WebSocket and Telegram - **Complex workflows**: LangGraph for stateful trading analysis (backtest → risk → approval) - **Agent harness**: Stateless orchestrator (all context lives in user's MCP container) -- **MCP resource integration**: User's RAG, conversation history, and preferences ## Container Management @@ -91,9 +90,7 @@ Containers self-manage their lifecycle using the lifecycle sidecar (see `../life - OpenAI GPT - Google Gemini - OpenRouter (one key for 300+ models) -- Ollama (for embeddings): https://ollama.com/download - Redis (for session/hot storage) -- Qdrant (for RAG vector search) - Kafka + Flink + Iceberg (for durable storage) ### Development @@ -123,20 +120,7 @@ DEFAULT_MODEL_PROVIDER=anthropic DEFAULT_MODEL=claude-sonnet-4-6 ``` -4. Start Ollama and pull embedding model: -```bash -# Install Ollama (one-time): https://ollama.com/download -# Or with Docker: docker run -d -p 11434:11434 ollama/ollama - -# Pull the all-minilm embedding model (90MB, CPU-friendly) -ollama pull all-minilm - -# Alternative models: -# ollama pull nomic-embed-text # 8K context length -# ollama pull mxbai-embed-large # Higher accuracy, slower -``` - -5. Run development server: +4. Run development server: ```bash npm run dev ``` @@ -217,138 +201,6 @@ ws.send(JSON.stringify({ **`GET /health`** - Returns server health status -## Ollama Deployment Options - -The gateway requires Ollama for embedding generation in RAG queries. You have two deployment options: - -### Option 1: Ollama in Gateway Container (Recommended for simplicity) - -Install Ollama directly in the gateway container. This keeps all dependencies local and simplifies networking. - -**Dockerfile additions:** -```dockerfile -FROM node:22-slim - -# Install Ollama -RUN curl -fsSL https://ollama.com/install.sh | sh - -# Pull embedding model at build time -RUN ollama serve & \ - sleep 5 && \ - ollama pull all-minilm && \ - pkill ollama - -# ... rest of your gateway Dockerfile -``` - -**Start script (entrypoint.sh):** -```bash -#!/bin/bash -# Start Ollama in background -ollama serve & - -# Start gateway -node dist/main.js -``` - -**Pros:** -- Simple networking (localhost:11434) -- No extra K8s resources -- Self-contained deployment - -**Cons:** -- Larger container image (~200MB extra) -- CPU/memory shared with gateway process - -**Resource requirements:** -- Add +200MB memory -- Add +0.2 CPU cores for embedding inference - -### Option 2: Ollama as Separate Pod/Sidecar - -Deploy Ollama as a separate container in the same pod (sidecar) or as its own deployment. - -**K8s Deployment (sidecar pattern):** -```yaml -apiVersion: apps/v1 -kind: Deployment -metadata: - name: gateway -spec: - template: - spec: - containers: - - name: gateway - image: ghcr.io/dexorder/gateway:latest - env: - - name: OLLAMA_URL - value: http://localhost:11434 - - - name: ollama - image: ollama/ollama:latest - command: ["/bin/sh", "-c"] - args: - - | - ollama serve & - sleep 5 - ollama pull all-minilm - wait - resources: - requests: - memory: "512Mi" - cpu: "500m" - limits: - memory: "1Gi" - cpu: "1000m" -``` - -**K8s Deployment (separate service):** -```yaml -apiVersion: apps/v1 -kind: Deployment -metadata: - name: ollama -spec: - replicas: 1 - template: - spec: - containers: - - name: ollama - image: ollama/ollama:latest - # ... same as above ---- -apiVersion: v1 -kind: Service -metadata: - name: ollama -spec: - selector: - app: ollama - ports: - - port: 11434 -``` - -Gateway `.env`: -```bash -OLLAMA_URL=http://ollama:11434 -``` - -**Pros:** -- Isolated resource limits -- Can scale separately -- Easier to monitor/debug - -**Cons:** -- More K8s resources -- Network hop (minimal latency) -- More complex deployment - -### Recommendation - -For most deployments: **Use Option 1 (in-container)** for simplicity, unless you need to: -- Share Ollama across multiple services -- Scale embedding inference independently -- Run Ollama on GPU nodes (gateway on CPU nodes) ## TODO diff --git a/gateway/config.example.yaml b/gateway/config.example.yaml index 761e3cc5..da0016f2 100644 --- a/gateway/config.example.yaml +++ b/gateway/config.example.yaml @@ -58,11 +58,6 @@ kubernetes: redis: url: redis://localhost:6379 -# Qdrant (for RAG vector search) -qdrant: - url: http://localhost:6333 - collection: gateway_memory - # Iceberg (for durable storage via REST catalog) iceberg: catalog_uri: http://iceberg-catalog:8181 diff --git a/gateway/knowledge/README.md b/gateway/knowledge/README.md index b95579bc..003ceb2b 100644 --- a/gateway/knowledge/README.md +++ b/gateway/knowledge/README.md @@ -1,6 +1,6 @@ # Dexorder Knowledge Base -This directory contains global knowledge documents that are automatically loaded into the RAG system as platform-wide knowledge (user_id="0"). +This directory contains global knowledge documents that are automatically loaded into the agent's context at startup. ## Structure @@ -40,9 +40,7 @@ Content here... 1. At gateway startup, the DocumentLoader scans this directory 2. Each markdown file is chunked by headers (max ~1000 tokens per chunk) -3. Chunks are embedded using the configured embedding service -4. Embeddings are stored in Qdrant with user_id="0" (global namespace) -5. Content hash tracking enables incremental updates +3. Content hash tracking enables incremental updates ## Updating Documents @@ -55,14 +53,6 @@ Content here... - Deploy new version - Gateway will detect changes and update vectors automatically -## RAG Integration - -When users query the agent: -1. Their query is embedded -2. Qdrant searches both global (user_id="0") and user-specific vectors -3. Relevant chunks from these docs are included in context -4. LLM generates response with platform knowledge - ## Adding New Documents 1. Create markdown file in appropriate subdirectory @@ -90,12 +80,3 @@ Check logs for load statistics: ``` Knowledge documents loaded: { loaded: 5, updated: 2, skipped: 3 } ``` - -Monitor Qdrant collection stats: -``` -GET /health -{ - "qdrantVectors": 1234, - "qdrantIndexed": 1234 -} -``` diff --git a/gateway/package.json b/gateway/package.json index c7c64864..13bda3eb 100644 --- a/gateway/package.json +++ b/gateway/package.json @@ -19,6 +19,7 @@ "@langchain/community": "^1.1.27", "@langchain/core": "latest", "@langchain/langgraph": "latest", + "@langchain/anthropic": "latest", "@langchain/openai": "^1.4.2", "@modelcontextprotocol/sdk": "^1.0.4", "@types/pdf-parse": "^1.1.5", diff --git a/gateway/prompt/agent-main.md b/gateway/prompt/agent-main.md index 2933d83b..15320f66 100644 --- a/gateway/prompt/agent-main.md +++ b/gateway/prompt/agent-main.md @@ -21,6 +21,8 @@ Delegate specialized tasks to subagents using the `Spawn` tool. Each subagent ha - Multi-symbol comparisons - Custom calculations using Python (pandas, numpy, scipy, matplotlib, etc.) +**Always begin the instruction with:** `Research script name: ""` — e.g. `Research script name: "Monday Tuesday Session Overlap"`. The research agent uses this name when calling `PythonWrite` or `PythonEdit`. + Do **NOT** include time range, history length, bar count, period size, or resolution guidance in the instruction unless the user explicitly specifies such. The research agent selects its own optimal window and period otherwise. **`Spawn({agent: "indicator", instruction: "..."})`** — for ANYTHING indicator-related on the chart: diff --git a/gateway/prompt/agent-research.md b/gateway/prompt/agent-research.md index a68fa9a8..f5a061e7 100644 --- a/gateway/prompt/agent-research.md +++ b/gateway/prompt/agent-research.md @@ -15,6 +15,10 @@ dynamic_imports: You are a specialized assistant that creates Python research scripts for market data analysis and visualization. +## CRITICAL RULE + +**You MUST call `PythonWrite` (new script) or `PythonEdit` (existing script) as your FIRST tool call. NEVER write analysis text without first creating or updating a script.** If you find yourself about to generate analysis text without a tool call, stop and call `PythonWrite` or `PythonEdit` first. A text-only response is always wrong. + ## Your Purpose Create Python scripts that: diff --git a/gateway/secrets.example.yaml b/gateway/secrets.example.yaml index 31c074b2..c4a83816 100644 --- a/gateway/secrets.example.yaml +++ b/gateway/secrets.example.yaml @@ -26,10 +26,6 @@ email: push: service_key: "" -# Qdrant API key (optional, for hosted Qdrant) -qdrant: - api_key: "" - # Iceberg S3 credentials iceberg: s3_access_key: minioadmin diff --git a/gateway/src/auth/auth-service.ts b/gateway/src/auth/auth-service.ts index d57e3155..3bd95eab 100644 --- a/gateway/src/auth/auth-service.ts +++ b/gateway/src/auth/auth-service.ts @@ -105,36 +105,36 @@ export class AuthService { asResponse: true, }); - // Extract bearer token from response headers (set by bearer plugin) - const token = response.headers.get('set-auth-token'); - - if (!token) { - this.config.logger.error('Bearer token not found in response headers'); - return { - token: '', - userId: '', - error: 'Authentication token not generated', - }; - } - // Parse the response body to get user info const result = await response.json() as { user?: { id: string; email: string; name: string }; error?: string; }; + if (!response.ok) { + this.config.logger.warn({ status: response.status }, 'Sign in rejected by auth provider'); + return { + token: '', + userId: '', + error: 'Invalid email or password.', + }; + } + + // Extract bearer token from response headers (set by bearer plugin) + const token = response.headers.get('set-auth-token'); + this.config.logger.debug({ hasUser: !!result.user, userId: result.user?.id, hasToken: !!token, }, 'Sign in result'); - if (!result.user) { - this.config.logger.warn('Sign in failed: no user in result'); + if (!token || !result.user) { + this.config.logger.error({ hasToken: !!token, hasUser: !!result.user }, 'Sign in succeeded but session data missing'); return { token: '', userId: '', - error: 'Invalid credentials', + error: 'Login failed. Please try again.', }; } @@ -147,7 +147,7 @@ export class AuthService { return { token: '', userId: '', - error: error.message || 'Sign in failed', + error: 'Login failed. Please try again.', }; } } diff --git a/gateway/src/channels/websocket-handler.ts b/gateway/src/channels/websocket-handler.ts index 67532bde..ca51b1af 100644 --- a/gateway/src/channels/websocket-handler.ts +++ b/gateway/src/channels/websocket-handler.ts @@ -5,6 +5,8 @@ import type { AgentHarness, HarnessFactory } from '../harness/agent-harness.js'; import type { HarnessEvent } from '../harness/harness-events.js'; import type { InboundMessage } from '../types/messages.js'; import { randomUUID } from 'crypto'; +import { parseModelTag, MODEL_TAGS } from '../llm/model-tags.js'; +import type { LLMProvider } from '../llm/provider.js'; import type { SessionRegistry, EventSubscriber, Session } from '../events/index.js'; import type { OHLCService, BarUpdateCallback } from '../services/ohlc-service.js'; import type { SymbolIndexService } from '../services/symbol-index-service.js'; @@ -30,6 +32,24 @@ function jsonStringifySafe(obj: any): string { ); } +function makeChunkDebouncer(send: (content: string) => void, delayMs = 200) { + let buffer = ''; + let timer: ReturnType | null = null; + + function flush() { + if (timer !== null) { clearTimeout(timer); timer = null; } + if (buffer.length > 0) { send(buffer); buffer = ''; } + } + + function add(content: string) { + buffer += content; + if (timer !== null) clearTimeout(timer); + timer = setTimeout(flush, delayMs); + } + + return { add, flush }; +} + export type SessionStatus = 'authenticating' | 'spinning_up' | 'initializing' | 'ready' | 'error' function sendStatus(socket: WebSocket, status: SessionStatus, message: string): void { @@ -257,6 +277,7 @@ export class WebSocketHandler { userId: authContext.userId, licenseType: authContext.license.licenseType, message: 'Connected to Dexorder AI', + modelTags: MODEL_TAGS.map(m => m.tag), }) ); @@ -272,25 +293,32 @@ export class WebSocketHandler { } else { // First conversation — auto-send greeting prompt and stream the response socket.send(JSON.stringify({ type: 'agent_chunk', content: '', done: false })); + const greetingDebouncer = makeChunkDebouncer(content => + socket.send(JSON.stringify({ type: 'agent_chunk', content, done: false })) + ); for await (const event of harness!.streamGreeting()) { const e = event as HarnessEvent; switch (e.type) { case 'chunk': - socket.send(JSON.stringify({ type: 'agent_chunk', content: e.content, done: false })); + greetingDebouncer.add(e.content); break; case 'tool_call': + greetingDebouncer.flush(); socket.send(JSON.stringify({ type: 'agent_tool_call', toolName: e.toolName, label: e.label })); break; case 'image': + greetingDebouncer.flush(); socket.send(JSON.stringify({ type: 'image', data: e.data, mimeType: e.mimeType, caption: e.caption })); break; case 'error': + greetingDebouncer.flush(); socket.send(JSON.stringify({ type: 'text', text: `An error occurred during greeting.` })); break; case 'done': break; } } + greetingDebouncer.flush(); socket.send(JSON.stringify({ type: 'agent_chunk', content: '', done: true })); } } @@ -304,47 +332,75 @@ export class WebSocketHandler { // Route based on message type if (payload.type === 'message' || payload.type === 'agent_user_message') { - // Chat message - send to agent harness with streaming - const inboundMessage: InboundMessage = { - messageId: randomUUID(), - userId: authContext.userId, - sessionId: authContext.sessionId, - content: payload.content, - attachments: payload.attachments, - timestamp: new Date(), - }; - if (!harness) { logger.error('Harness not initialized'); socket.send(JSON.stringify({ type: 'error', message: 'Session not ready' })); return; } + // Check for @ModelTag at the start of the message + const parsedTag = parseModelTag(payload.content ?? ''); + let messageContent: string = payload.content ?? ''; + let modelOverride: { modelId: string; provider?: LLMProvider } | undefined; + + if (parsedTag) { + await harness.clearHistory(); + socket.send(JSON.stringify({ type: 'model_switched', tag: parsedTag.tag, modelId: parsedTag.modelId, rest: parsedTag.rest })); + messageContent = parsedTag.rest; + modelOverride = { modelId: parsedTag.modelId, provider: parsedTag.provider }; + logger.info({ tag: parsedTag.tag, modelId: parsedTag.modelId }, 'Model tag switch'); + } + + // Chat message - send to agent harness with streaming + const inboundMessage: InboundMessage = { + messageId: randomUUID(), + userId: authContext.userId, + sessionId: authContext.sessionId, + content: messageContent, + attachments: payload.attachments, + timestamp: new Date(), + }; + try { // Acknowledge receipt immediately so the client can show the seen indicator socket.send(JSON.stringify({ type: 'agent_chunk', content: '', done: false })); logger.info('Streaming harness response'); let fatalError = false; - for await (const event of harness.streamMessage(inboundMessage)) { + const msgDebouncer = makeChunkDebouncer(content => + socket.send(JSON.stringify({ type: 'agent_chunk', content, done: false })) + ); + const stream = (parsedTag && !messageContent) + ? harness.streamGreeting(modelOverride) + : harness.streamMessage(inboundMessage, { modelOverride }); + for await (const event of stream) { const e = event as HarnessEvent; switch (e.type) { case 'chunk': - socket.send(JSON.stringify({ type: 'agent_chunk', content: e.content, done: false })); + msgDebouncer.add(e.content); break; case 'tool_call': + msgDebouncer.flush(); socket.send(JSON.stringify({ type: 'agent_tool_call', toolName: e.toolName, label: e.label })); break; case 'subagent_tool_call': + msgDebouncer.flush(); socket.send(JSON.stringify({ type: 'subagent_tool_call', agentName: e.agentName, toolName: e.toolName, label: e.label })); break; case 'subagent_chunk': + msgDebouncer.flush(); socket.send(JSON.stringify({ type: 'subagent_chunk', agentName: e.agentName, content: e.content })); break; + case 'subagent_thinking': + msgDebouncer.flush(); + socket.send(JSON.stringify({ type: 'subagent_thinking', agentName: e.agentName, content: e.content })); + break; case 'image': + msgDebouncer.flush(); socket.send(JSON.stringify({ type: 'image', data: e.data, mimeType: e.mimeType, caption: e.caption })); break; case 'error': + msgDebouncer.flush(); socket.send(JSON.stringify({ type: 'text', text: `An unrecoverable error occurred in the ${e.source}.` })); if (e.fatal) fatalError = true; break; @@ -352,6 +408,7 @@ export class WebSocketHandler { break; } } + msgDebouncer.flush(); if (fatalError) { socket.close(1011, 'Fatal error'); @@ -451,6 +508,9 @@ export class WebSocketHandler { case 'subagent_tool_call': socket.send(JSON.stringify({ type: 'subagent_tool_call', agentName: e.agentName, toolName: e.toolName, label: e.label })); break; + case 'subagent_thinking': + socket.send(JSON.stringify({ type: 'subagent_thinking', agentName: e.agentName, content: e.content })); + break; case 'tool_call': socket.send(JSON.stringify({ type: 'agent_tool_call', toolName: e.toolName, label: e.label })); break; @@ -730,6 +790,13 @@ export class WebSocketHandler { // Create a per-subscription callback that forwards bars to this socket const barCallback: BarUpdateCallback = (bar) => { if (socket.readyState !== 1 /* OPEN */) return; + const symbolMeta = symbolIndexService?.getSymbolByTicker(bar.ticker); + const priceDivisor = (symbolMeta?.price_precision ?? 0) > 0 + ? Math.pow(10, symbolMeta!.price_precision!) + : 1; + const sizeDivisor = (symbolMeta?.size_precision ?? 0) > 0 + ? Math.pow(10, symbolMeta!.size_precision!) + : 1; socket.send(JSON.stringify({ type: 'bar_update', subscription_id: payload.subscription_id, @@ -739,11 +806,11 @@ export class WebSocketHandler { bar: { // Convert nanoseconds → seconds for client compatibility time: Number(bar.timestamp / 1_000_000_000n), - open: bar.open, - high: bar.high, - low: bar.low, - close: bar.close, - volume: bar.volume, + open: bar.open / priceDivisor, + high: bar.high / priceDivisor, + low: bar.low / priceDivisor, + close: bar.close / priceDivisor, + volume: bar.volume / sizeDivisor, }, })); }; diff --git a/gateway/src/events/types.ts b/gateway/src/events/types.ts index 5687f0e2..4d84c65e 100644 --- a/gateway/src/events/types.ts +++ b/gateway/src/events/types.ts @@ -151,7 +151,7 @@ export function deserializeUserEvent(data: Buffer): UserEvent { eventId: json.event_id, timestamp: json.timestamp, eventType: json.event_type as EventType, - payload: Buffer.from(json.payload, 'base64'), + payload: json.payload ? Buffer.from(json.payload, 'base64') : Buffer.alloc(0), delivery: { priority: json.delivery.priority as Priority, channels: json.delivery.channels.map( diff --git a/gateway/src/harness/README.md b/gateway/src/harness/README.md index 6acade71..b296deb9 100644 --- a/gateway/src/harness/README.md +++ b/gateway/src/harness/README.md @@ -7,7 +7,7 @@ Comprehensive agent orchestration system for Dexorder AI platform, built on Lang ``` gateway/src/ ├── harness/ -│ ├── memory/ # Storage layer (Redis + Iceberg + Qdrant) +│ ├── memory/ # Storage layer (Redis + Iceberg) │ ├── subagents/ # Specialized agents with multi-file memory │ ├── workflows/ # LangGraph state machines │ ├── prompts/ # System prompts @@ -27,13 +27,10 @@ Tiered storage architecture: - **Redis**: Hot state (active sessions, checkpoints) - **Iceberg**: Cold storage (durable conversations, analytics) -- **Qdrant**: Vector search (RAG, semantic memory) **Key Files:** - `checkpoint-saver.ts`: LangGraph checkpoint persistence - `conversation-store.ts`: Message history management -- `rag-retriever.ts`: Vector similarity search -- `embedding-service.ts`: Text→vector conversion - `session-context.ts`: User context with channel metadata ### 2. Tools (`../tools/`) @@ -176,19 +173,11 @@ Based on [harness-rag.txt discussion](../../chat/harness-rag.txt): - Time-travel queries - GDPR-compliant deletion with compaction -### Vector Search (Qdrant) -- Conversation embeddings -- Long-term memory -- RAG retrieval -- Payload-indexed by user_id for fast GDPR deletion -- **Global knowledge base** (user_id="0") loaded from markdown files - ### GDPR Compliance ```typescript // Delete user data across all stores await conversationStore.deleteUserData(userId); -await ragRetriever.deleteUserData(userId); await checkpointSaver.delete(userId); await containerManager.deleteContainer(userId); @@ -247,19 +236,13 @@ Already in `gateway/package.json`: import Redis from 'ioredis'; import { TieredCheckpointSaver, - ConversationStore, - EmbeddingService, - RAGRetriever + ConversationStore } from './harness/memory'; const redis = new Redis(process.env.REDIS_URL); const checkpointSaver = new TieredCheckpointSaver(redis, logger); const conversationStore = new ConversationStore(redis, logger); -const embeddings = new EmbeddingService({ provider: 'openai', apiKey }, logger); -const ragRetriever = new RAGRetriever({ url: QDRANT_URL }, logger); - -await ragRetriever.initialize(); ``` ### 3. Create Subagents @@ -309,56 +292,6 @@ const analysis = await skill.execute({ }); ``` -## Global Knowledge System - -The harness includes a document loader that automatically loads markdown files from `gateway/knowledge/` into Qdrant as global knowledge (user_id="0"). - -### Directory Structure -``` -gateway/knowledge/ - ├── platform/ # Platform capabilities and architecture - ├── trading/ # Trading concepts and fundamentals - ├── indicators/ # Indicator development guides - └── strategies/ # Strategy patterns and examples -``` - -### How It Works - -1. **Startup**: Documents are loaded automatically when gateway starts -2. **Chunking**: Intelligent splitting by markdown headers (~1000 tokens/chunk) -3. **Embedding**: Chunks are embedded using configured embedding service -4. **Storage**: Stored in Qdrant with user_id="0" (global namespace) -5. **Updates**: Content hashing detects changes for incremental updates - -### RAG Query Flow - -When a user sends a message: -1. Query is embedded using same embedding service -2. Qdrant searches vectors with filter: `user_id = current_user OR user_id = "0"` -3. Results include both user-specific and global knowledge -4. Relevant chunks are added to LLM context -5. LLM generates response with platform knowledge - -### Managing Knowledge - -**Add new documents**: -```bash -# Create markdown file in appropriate directory -echo "# New Topic" > gateway/knowledge/platform/new-topic.md - -# Reload knowledge (development) -curl -X POST http://localhost:3000/admin/reload-knowledge -``` - -**Check stats**: -```bash -curl http://localhost:3000/admin/knowledge-stats -``` - -**In production**: Just deploy updated markdown files - they'll be loaded on startup. - -See [gateway/knowledge/README.md](../../knowledge/README.md) for detailed documentation. - ## Next Steps 1. **Implement Iceberg Integration**: Complete TODOs in checkpoint-saver.ts and conversation-store.ts @@ -371,5 +304,4 @@ See [gateway/knowledge/README.md](../../knowledge/README.md) for detailed docume - Architecture discussion: [chat/harness-rag.txt](../../chat/harness-rag.txt) - LangGraph docs: https://langchain-ai.github.io/langgraphjs/ -- Qdrant docs: https://qdrant.tech/documentation/ - Apache Iceberg: https://iceberg.apache.org/docs/latest/ diff --git a/gateway/src/harness/agent-harness.ts b/gateway/src/harness/agent-harness.ts index 5e486687..86f5bcf1 100644 --- a/gateway/src/harness/agent-harness.ts +++ b/gateway/src/harness/agent-harness.ts @@ -7,7 +7,7 @@ import type { ConversationStore } from './memory/conversation-store.js'; import type { BlobStore } from './memory/blob-store.js'; import type { InboundMessage, OutboundMessage } from '../types/messages.js'; import { MCPClientConnector } from './mcp-client.js'; -import { LLMProviderFactory, type ProviderConfig } from '../llm/provider.js'; +import { LLMProvider, LLMProviderFactory, type ProviderConfig } from '../llm/provider.js'; import { ModelRouter, RoutingStrategy } from '../llm/router.js'; import type { ModelMiddleware } from '../llm/middleware.js'; import type { WorkspaceManager } from '../workspace/workspace-manager.js'; @@ -107,13 +107,10 @@ export class AgentHarness { this.wikiLoader, getToolRegistry(), async (maxTokens?: number) => { - const { model } = await this.modelRouter.route( - 'analyze and backtest research data', - this.config.license, - RoutingStrategy.COMPLEXITY, - this.config.userId, - maxTokens, - ); + const { model } = this.modelRouter.createModel({ + ...this.modelFactory.getDefaultModel(), + ...(maxTokens !== undefined && { maxTokens }), + }); return model; }, config.logger, @@ -363,34 +360,50 @@ export class AgentHarness { this.config.logger.debug('Streaming model response...'); let response: any = null; - try { - const stream = await model.stream(messagesCopy, { signal }); - for await (const chunk of stream) { - if (typeof chunk.content === 'string' && chunk.content.length > 0) { - this.config.logger.trace({ content: chunk.content }, 'raw chunk'); - yield { type: 'chunk', content: chunk.content }; - } else if (Array.isArray(chunk.content)) { - for (const block of chunk.content) { - if (block.type === 'text' && block.text) { - this.config.logger.trace({ content: block.text }, 'raw chunk'); - yield { type: 'chunk', content: block.text }; + const MAX_STREAM_ATTEMPTS = 4; + for (let attempt = 1; attempt <= MAX_STREAM_ATTEMPTS; attempt++) { + response = null; + try { + const stream = await model.stream(messagesCopy, { signal }); + for await (const chunk of stream) { + if (typeof chunk.content === 'string' && chunk.content.length > 0) { + this.config.logger.trace({ content: chunk.content }, 'raw chunk'); + yield { type: 'chunk', content: chunk.content }; + } else if (Array.isArray(chunk.content)) { + for (const block of chunk.content) { + if (block.type === 'text' && block.text) { + this.config.logger.trace({ content: block.text }, 'raw chunk'); + yield { type: 'chunk', content: block.text }; + } } } + response = response ? response.concat(chunk) : chunk; } - response = response ? response.concat(chunk) : chunk; + break; // success — exit retry loop + } catch (invokeError: any) { + const is429 = invokeError?.status === 429 || invokeError?.lc_error_code === 'MODEL_RATE_LIMIT'; + if (is429 && attempt < MAX_STREAM_ATTEMPTS) { + const delaySec = parseRetryAfter(invokeError?.headers); + const delayMs = delaySec != null ? delaySec * 1000 : Math.min(5000 * attempt, 30000); + this.config.logger.warn( + { attempt, delayMs, iteration: iterations, messageCount: messagesCopy.length }, + 'Model rate limited (429), retrying after delay' + ); + await new Promise(resolve => setTimeout(resolve, delayMs)); + continue; + } + this.config.logger.error( + { + error: invokeError, + errorMessage: invokeError?.message, + errorStack: invokeError?.stack, + iteration: iterations, + messageCount: messagesCopy.length, + }, + 'Model streaming failed in tool calling loop' + ); + throw invokeError; } - } catch (invokeError: any) { - this.config.logger.error( - { - error: invokeError, - errorMessage: invokeError?.message, - errorStack: invokeError?.stack, - iteration: iterations, - messageCount: messagesCopy.length, - }, - 'Model streaming failed in tool calling loop' - ); - throw invokeError; } this.config.logger.info( @@ -684,7 +697,7 @@ export class AgentHarness { * Yields typed HarnessEvents (chunk, tool_call, image, done) and saves the * conversation to the store once the done event has been emitted. */ - async *streamMessage(message: InboundMessage, options?: { saveUserMessage?: boolean }): AsyncGenerator { + async *streamMessage(message: InboundMessage, options?: { saveUserMessage?: boolean; modelOverride?: { modelId: string; provider?: LLMProvider } }): AsyncGenerator { this.config.logger.info( { messageId: message.messageId, userId: message.userId, content: message.content.substring(0, 100) }, 'Processing user message' @@ -725,12 +738,24 @@ export class AgentHarness { // 4. Get the configured model this.config.logger.debug('Routing to model'); - const { model, middleware } = await this.modelRouter.route( - message.content, - this.config.license, - RoutingStrategy.COMPLEXITY, - this.config.userId - ); + let model, middleware; + if (options?.modelOverride) { + const defaultConfig = this.modelRouter.getDefaultModelConfig(); + ({ model, middleware } = this.modelRouter.createModel({ + ...defaultConfig, + model: options.modelOverride.modelId, + provider: options.modelOverride.provider ?? defaultConfig.provider, + thinking: undefined, + })); + this.config.logger.info({ modelId: options.modelOverride.modelId, provider: options.modelOverride.provider }, 'Using @tag model override'); + } else { + ({ model, middleware } = await this.modelRouter.route( + message.content, + this.config.license, + RoutingStrategy.COMPLEXITY, + this.config.userId + )); + } this.middleware = middleware; this.config.logger.info({ modelName: model.constructor.name }, 'Model selected'); @@ -837,11 +862,18 @@ export class AgentHarness { } } + async clearHistory(): Promise { + if (this.conversationStore) { + const channelKey = this.config.channelType ?? ChannelType.WEBSOCKET; + await this.conversationStore.deleteSession(this.config.userId, this.config.sessionId, channelKey); + } + } + /** - * Stream a greeting response for first-time users. + * Stream a greeting response for first-time users (or after a model switch). * Sends "Who are you and what can you do?" through the normal message pipeline. */ - async *streamGreeting(): AsyncGenerator { + async *streamGreeting(modelOverride?: { modelId: string; provider?: LLMProvider }): AsyncGenerator { const content = await AgentHarness.loadWelcomePrompt(); const greetingMessage: InboundMessage = { messageId: `greeting_${Date.now()}`, @@ -850,7 +882,7 @@ export class AgentHarness { content, timestamp: new Date(), }; - yield* this.streamMessage(greetingMessage, { saveUserMessage: false }); + yield* this.streamMessage(greetingMessage, { saveUserMessage: false, modelOverride }); } /** @@ -1040,9 +1072,25 @@ export class AgentHarness { } // ============================================================================= -// Details update helpers (module-level, no class dependency) +// Helpers (module-level, no class dependency) // ============================================================================= +/** + * Parse the Retry-After header value into seconds. + * Accepts both delta-seconds ("30") and HTTP-date ("Mon, 01 Jan 2026 00:00:00 GMT"). + * Returns null if the header is absent or unparseable. + */ +function parseRetryAfter(headers: Record | undefined): number | null { + if (!headers) return null; + const value = headers['retry-after'] ?? headers['Retry-After']; + if (!value) return null; + const num = parseFloat(value); + if (!isNaN(num)) return Math.max(0, num); + const date = new Date(value); + if (!isNaN(date.getTime())) return Math.max(0, (date.getTime() - Date.now()) / 1000); + return null; +} + /** * Produce a minimal unified diff between two strings, suitable for passing to * an LLM as a change description. Returns an empty string when there is no diff. diff --git a/gateway/src/harness/harness-events.ts b/gateway/src/harness/harness-events.ts index 05731f61..0e6037f5 100644 --- a/gateway/src/harness/harness-events.ts +++ b/gateway/src/harness/harness-events.ts @@ -48,4 +48,10 @@ export interface ErrorEvent { fatal: boolean; } -export type HarnessEvent = ChunkEvent | ToolCallEvent | ImageEvent | DoneEvent | SubagentChunkEvent | SubagentThinkingEvent | SubagentToolCallEvent | ErrorEvent; +export interface ModelSwitchedEvent { + type: 'model_switched'; + tag: string; + modelId: string; +} + +export type HarnessEvent = ChunkEvent | ToolCallEvent | ImageEvent | DoneEvent | SubagentChunkEvent | SubagentThinkingEvent | SubagentToolCallEvent | ErrorEvent | ModelSwitchedEvent; diff --git a/gateway/src/harness/spawn/spawn-service.ts b/gateway/src/harness/spawn/spawn-service.ts index 54b86052..c909fb82 100644 --- a/gateway/src/harness/spawn/spawn-service.ts +++ b/gateway/src/harness/spawn/spawn-service.ts @@ -1,8 +1,5 @@ import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; import { SystemMessage, HumanMessage } from '@langchain/core/messages'; - -/** All platform tool names available to every subagent. */ -const ALL_PLATFORM_TOOLS = ['SymbolLookup', 'GetChartData', 'GetTicker24h', 'WebSearch', 'FetchPage', 'ArxivSearch']; import type { FastifyBaseLogger } from 'fastify'; import { createReactAgent } from '@langchain/langgraph/prebuilt'; import type { HarnessEvent, SubagentChunkEvent, SubagentThinkingEvent } from '../harness-events.js'; @@ -13,6 +10,62 @@ import type { ToolRegistry } from '../../tools/tool-registry.js'; import type { MCPToolInfo } from '../../tools/mcp/mcp-tool-wrapper.js'; import { WikiLoader, type SpawnContext } from './wiki-loader.js'; +/** All platform tool names available to every subagent. */ +const ALL_PLATFORM_TOOLS = ['SymbolLookup', 'GetChartData', 'GetTicker24h', 'WebSearch', 'FetchPage', 'ArxivSearch']; + +/** + * Streaming filter that strips triple-backtick fenced code blocks from text as it + * arrives in chunks. Holds back at most 2 characters of look-ahead so normal text + * streams through with no perceptible delay. + */ +class FenceFilter { + private buf = ''; + private inFence = false; + + write(chunk: string): string { + this.buf += chunk; + return this.drain(false); + } + + end(): string { + return this.drain(true); + } + + private drain(final: boolean): string { + let out = ''; + while (true) { + if (!this.inFence) { + const start = this.buf.indexOf('```'); + if (start === -1) { + const keep = final ? this.buf.length : Math.max(0, this.buf.length - 2); + out += this.buf.slice(0, keep); + this.buf = this.buf.slice(keep); + break; + } + out += this.buf.slice(0, start); + const headerEnd = this.buf.indexOf('\n', start + 3); + if (headerEnd === -1 && !final) { + this.buf = this.buf.slice(start); + break; + } + this.inFence = true; + this.buf = headerEnd !== -1 ? this.buf.slice(headerEnd + 1) : ''; + } else { + const end = this.buf.indexOf('```'); + if (end === -1) { + this.buf = final ? '' : this.buf.slice(Math.max(0, this.buf.length - 2)); + break; + } + this.inFence = false; + const closingEnd = this.buf.indexOf('\n', end + 3); + this.buf = closingEnd !== -1 ? this.buf.slice(closingEnd + 1) : this.buf.slice(end + 3); + } + } + // Collapse blank lines left where code blocks were removed + return out.replace(/\n{3,}/g, '\n\n'); + } +} + export interface SpawnInput { agentName: string; instruction: string; @@ -138,13 +191,15 @@ export class SpawnService { ); let finalText = ''; + const fenceFilter = new FenceFilter(); for await (const [mode, data] of await stream) { if (signal?.aborted) break; if (mode === 'messages') { for (const chunk of SpawnService.extractStreamChunks(data, agentName)) { - yield chunk; + const filtered = fenceFilter.write(chunk.content); + if (filtered) yield { ...chunk, content: filtered }; } } else if (mode === 'updates') { if ((data as any).agent?.messages) { @@ -167,6 +222,9 @@ export class SpawnService { } } + const tail = fenceFilter.end(); + if (tail) yield { type: 'subagent_chunk', agentName, content: tail }; + this.logger.info( { agentName, textLength: finalText.length, imageCount: imageCapture.length }, 'SpawnService: finished' @@ -182,12 +240,16 @@ export class SpawnService { /** * Extract subagent_chunk / subagent_thinking events from a LangGraph `messages` stream datum. + * Only processes AIMessageChunks — ToolMessages (identified by tool_call_id) are skipped + * because their content is raw tool result data, not agent narrative text. */ static extractStreamChunks( data: unknown, agentName: string, ): Array { const msg = Array.isArray(data) ? (data as unknown[])[0] : data; + // ToolMessages have tool_call_id; AIMessageChunks don't — skip tool results + if ((msg as any)?.tool_call_id != null) return []; const content = (msg as any)?.content; if (typeof content === 'string') { return content ? [{ type: 'subagent_chunk', agentName, content }] : []; diff --git a/gateway/src/harness/workflows/trading-request/config.yaml b/gateway/src/harness/workflows/trading-request/config.yaml index 0db9500f..0726ed45 100644 --- a/gateway/src/harness/workflows/trading-request/config.yaml +++ b/gateway/src/harness/workflows/trading-request/config.yaml @@ -14,6 +14,4 @@ approvalNodes: maxPositionPercent: 0.05 # 5% of portfolio max minRiskRewardRatio: 2.0 # Minimum 2:1 risk/reward -# Model override (optional) -model: claude-sonnet-4-6 temperature: 0.2 diff --git a/gateway/src/llm/model-tags.ts b/gateway/src/llm/model-tags.ts new file mode 100644 index 00000000..3b2fa6f8 --- /dev/null +++ b/gateway/src/llm/model-tags.ts @@ -0,0 +1,30 @@ +import { LLMProvider } from './provider.js'; + +export interface ModelTag { + tag: string; + modelId: string; + provider?: LLMProvider; +} + +export const MODEL_TAGS: ModelTag[] = [ + { tag: 'DeepSeek-Flash', modelId: 'deepseek-ai/DeepSeek-V4-Flash' }, + { tag: 'DeepSeek-Pro', modelId: 'deepseek-ai/DeepSeek-V4-Pro' }, + { tag: 'Kimi', modelId: 'moonshotai/Kimi-K2.6' }, + { tag: 'GLM', modelId: 'zai-org/GLM-5' }, + { tag: 'Qwen', modelId: 'Qwen/Qwen3.5-27B' }, + { tag: 'MiniMax', modelId: 'MiniMaxAI/MiniMax-M2.5' }, + { tag: 'Sonnet', modelId: 'claude-sonnet-4-6', provider: LLMProvider.ANTHROPIC }, + { tag: 'Haiku', modelId: 'claude-haiku-4-5-20251001', provider: LLMProvider.ANTHROPIC }, + { tag: 'Opus', modelId: 'claude-opus-4-7', provider: LLMProvider.ANTHROPIC }, +]; + +/** Parse a leading @Tag from message content. Case-insensitive. Returns null if not a known tag. */ +export function parseModelTag(content: string): (ModelTag & { rest: string }) | null { + const trimmed = content.trimStart(); + if (!trimmed.startsWith('@')) return null; + const spaceIdx = trimmed.indexOf(' '); + const tagName = spaceIdx === -1 ? trimmed.slice(1) : trimmed.slice(1, spaceIdx); + const rest = spaceIdx === -1 ? '' : trimmed.slice(spaceIdx + 1).trim(); + const found = MODEL_TAGS.find(m => m.tag.toLowerCase() === tagName.toLowerCase()); + return found ? { ...found, rest } : null; +} diff --git a/gateway/src/llm/provider.ts b/gateway/src/llm/provider.ts index ec687911..975f490c 100644 --- a/gateway/src/llm/provider.ts +++ b/gateway/src/llm/provider.ts @@ -1,5 +1,6 @@ import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; import { ChatOpenAI } from '@langchain/openai'; +import { ChatAnthropic } from '@langchain/anthropic'; import type { FastifyBaseLogger } from 'fastify'; import { type ModelMiddleware, NoopMiddleware, AnthropicCachingMiddleware } from './middleware.js'; @@ -11,6 +12,8 @@ export { NoopMiddleware, AnthropicCachingMiddleware }; */ export enum LLMProvider { DEEP_INFRA = 'deepinfra', + DEEP_INFRA_ANTHROPIC = 'deepinfra_anthropic', + ANTHROPIC = 'anthropic', } /** @@ -21,15 +24,16 @@ export interface ModelConfig { model: string; temperature?: number; maxTokens?: number; + thinking?: { budgetTokens: number }; } /** * License tier model configuration */ export interface LicenseTierModels { - default: string; - cost_optimized: string; - complex: string; + default: string | null; + cost_optimized: string | null; + complex: string | null; allowed_models?: string[]; blocked_models?: string[]; } @@ -48,11 +52,13 @@ export interface LicenseModelsConfig { */ export interface ProviderConfig { deepinfraApiKey?: string; + anthropicApiKey?: string; defaultModel?: ModelConfig; licenseModels?: LicenseModelsConfig; } const DEEP_INFRA_BASE_URL = 'https://api.deepinfra.com/v1/openai'; +const DEEP_INFRA_ANTHROPIC_BASE_URL = 'https://api.deepinfra.com/anthropic'; /** * LLM Provider factory @@ -80,6 +86,12 @@ export class LLMProviderFactory { case LLMProvider.DEEP_INFRA: return this.createDeepInfraModel(modelConfig); + case LLMProvider.DEEP_INFRA_ANTHROPIC: + return this.createDeepInfraAnthropicModel(modelConfig); + + case LLMProvider.ANTHROPIC: + return this.createAnthropicModel(modelConfig); + default: throw new Error(`Unsupported provider: ${modelConfig.provider}`); } @@ -106,6 +118,49 @@ export class LLMProviderFactory { return { model, middleware: new NoopMiddleware() }; } + /** + * Create Deep Infra model via Anthropic-compatible API (supports thinking) + */ + private createDeepInfraAnthropicModel(config: ModelConfig): { model: ChatAnthropic; middleware: AnthropicCachingMiddleware } { + if (!this.config.deepinfraApiKey) { + throw new Error('Deep Infra API key not configured'); + } + + const model = new ChatAnthropic({ + model: config.model, + ...(!config.thinking && { temperature: config.temperature ?? 0.7 }), + maxTokens: config.maxTokens ?? 8192, + anthropicApiKey: this.config.deepinfraApiKey, + clientOptions: { baseURL: DEEP_INFRA_ANTHROPIC_BASE_URL }, + ...(config.thinking && { + thinking: { type: 'enabled' as const, budget_tokens: config.thinking.budgetTokens }, + }), + }); + + return { model, middleware: new AnthropicCachingMiddleware() }; + } + + /** + * Create model via native Anthropic API (not Deep Infra) + */ + private createAnthropicModel(config: ModelConfig): { model: ChatAnthropic; middleware: AnthropicCachingMiddleware } { + if (!this.config.anthropicApiKey) { + throw new Error('Anthropic API key not configured'); + } + + const model = new ChatAnthropic({ + model: config.model, + ...(!config.thinking && { temperature: config.temperature ?? 0.7 }), + maxTokens: config.maxTokens ?? 8192, + anthropicApiKey: this.config.anthropicApiKey, + ...(config.thinking && { + thinking: { type: 'enabled' as const, budget_tokens: config.thinking.budgetTokens }, + }), + }); + + return { model, middleware: new AnthropicCachingMiddleware() }; + } + /** * Get default model based on environment */ @@ -118,10 +173,7 @@ export class LLMProviderFactory { throw new Error('Deep Infra API key not configured'); } - return { - provider: LLMProvider.DEEP_INFRA, - model: 'zai-org/GLM-5', - }; + throw new Error('Default model not configured — set defaults.model in gateway config'); } /** @@ -132,16 +184,3 @@ export class LLMProviderFactory { } } -/** - * Predefined model configurations - */ -export const MODELS = { - GLM_5: { - provider: LLMProvider.DEEP_INFRA, - model: 'zai-org/GLM-5', - }, - QWEN_235B: { - provider: LLMProvider.DEEP_INFRA, - model: 'Qwen/Qwen3-235B-A22B-Instruct-2507', - }, -} as const satisfies Record; diff --git a/gateway/src/llm/router.ts b/gateway/src/llm/router.ts index 6f1cc3f1..9e04a141 100644 --- a/gateway/src/llm/router.ts +++ b/gateway/src/llm/router.ts @@ -1,6 +1,6 @@ import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; import type { FastifyBaseLogger } from 'fastify'; -import { LLMProviderFactory, type ModelConfig, LLMProvider, type LicenseModelsConfig } from './provider.js'; +import { LLMProviderFactory, type ModelConfig, type LicenseModelsConfig } from './provider.js'; import type { ModelMiddleware } from './middleware.js'; import type { License } from '../types/user.js'; @@ -35,6 +35,17 @@ export class ModelRouter { this.licenseModels = factory.getLicenseModelsConfig(); } + /** + * Create a model directly from a config, bypassing routing logic. + */ + createModel(config: ModelConfig): { model: BaseChatModel; middleware: ModelMiddleware } { + return this.factory.createModel(config); + } + + getDefaultModelConfig(): ModelConfig { + return this.defaultModel; + } + /** * Route to appropriate model based on context */ @@ -107,73 +118,45 @@ export class ModelRouter { private routeByComplexity(message: string, license: License): ModelConfig { const isComplex = this.isComplexQuery(message); - // Use configuration if available if (this.licenseModels) { const tierConfig = this.licenseModels[license.licenseType]; if (tierConfig) { const model = isComplex ? tierConfig.complex : tierConfig.default; - return { provider: this.defaultModel.provider as LLMProvider, model }; + return model ? { ...this.defaultModel, model } : this.defaultModel; } } - // Fallback to hardcoded defaults - if (license.licenseType === 'enterprise') { - return isComplex - ? { provider: LLMProvider.DEEP_INFRA, model: 'Qwen/Qwen3-235B-A22B-Instruct-2507' } - : { provider: LLMProvider.DEEP_INFRA, model: 'zai-org/GLM-5' }; - } - - if (license.licenseType === 'pro') { - return isComplex - ? { provider: LLMProvider.DEEP_INFRA, model: 'zai-org/GLM-5' } - : { provider: LLMProvider.DEEP_INFRA, model: 'zai-org/GLM-5' }; - } - - return { provider: LLMProvider.DEEP_INFRA, model: 'zai-org/GLM-5' }; + return this.defaultModel; } /** * Route based on license tier */ private routeByLicenseTier(license: License): ModelConfig { - // Use configuration if available if (this.licenseModels) { const tierConfig = this.licenseModels[license.licenseType]; if (tierConfig) { - return { provider: this.defaultModel.provider as LLMProvider, model: tierConfig.default }; + const model = tierConfig.default; + return model ? { ...this.defaultModel, model } : this.defaultModel; } } - // Fallback to hardcoded defaults - switch (license.licenseType) { - case 'enterprise': - return { provider: LLMProvider.DEEP_INFRA, model: 'zai-org/GLM-5' }; - - case 'pro': - return { provider: LLMProvider.DEEP_INFRA, model: 'zai-org/GLM-5' }; - - case 'free': - return { provider: LLMProvider.DEEP_INFRA, model: 'zai-org/GLM-5' }; - - default: - return this.defaultModel; - } + return this.defaultModel; } /** * Route to cheapest available model */ private routeByCost(license: License): ModelConfig { - // Use configuration if available if (this.licenseModels) { const tierConfig = this.licenseModels[license.licenseType]; if (tierConfig) { - return { provider: this.defaultModel.provider as LLMProvider, model: tierConfig.cost_optimized }; + const model = tierConfig.cost_optimized; + return model ? { ...this.defaultModel, model } : this.defaultModel; } } - // Fallback: use GLM-5 - return { provider: LLMProvider.DEEP_INFRA, model: 'zai-org/GLM-5' }; + return this.defaultModel; } /** @@ -199,18 +182,7 @@ export class ModelRouter { } } - // Fallback to hardcoded defaults - if (license.licenseType === 'free') { - const allowedModels = ['zai-org/GLM-5']; - return allowedModels.includes(model.model); - } - - if (license.licenseType === 'pro') { - const blockedModels = ['Qwen/Qwen3-235B-A22B-Instruct-2507']; - return !blockedModels.includes(model.model); - } - - // Enterprise: all models allowed + // Without tier config, all models allowed return true; } diff --git a/gateway/src/main.ts b/gateway/src/main.ts index 59c853bf..b0945bb3 100644 --- a/gateway/src/main.ts +++ b/gateway/src/main.ts @@ -86,27 +86,31 @@ function loadConfig() { // LLM provider API keys and model configuration providerConfig: { deepinfraApiKey: secretsData.llm_providers?.deepinfra_api_key || process.env.DEEPINFRA_API_KEY, - defaultModel: { - provider: configData.defaults?.model_provider || 'deepinfra', - model: configData.defaults?.model || 'zai-org/GLM-5', - }, + anthropicApiKey: secretsData.llm_providers?.anthropic_api_key || process.env.ANTHROPIC_API_KEY, + defaultModel: configData.defaults?.model ? { + provider: configData.defaults.model_provider, + model: configData.defaults.model, + ...(configData.defaults.thinking_budget_tokens && { + thinking: { budgetTokens: configData.defaults.thinking_budget_tokens }, + }), + } : undefined, licenseModels: { free: { - default: configData.license_models?.free?.default || 'zai-org/GLM-5', - cost_optimized: configData.license_models?.free?.cost_optimized || 'zai-org/GLM-5', - complex: configData.license_models?.free?.complex || 'zai-org/GLM-5', - allowed_models: configData.license_models?.free?.allowed_models || ['zai-org/GLM-5'], + default: configData.license_models?.free?.default || null, + cost_optimized: configData.license_models?.free?.cost_optimized || null, + complex: configData.license_models?.free?.complex || null, + allowed_models: configData.license_models?.free?.allowed_models, }, pro: { - default: configData.license_models?.pro?.default || 'zai-org/GLM-5', - cost_optimized: configData.license_models?.pro?.cost_optimized || 'zai-org/GLM-5', - complex: configData.license_models?.pro?.complex || 'zai-org/GLM-5', - blocked_models: configData.license_models?.pro?.blocked_models || ['Qwen/Qwen3-235B-A22B-Instruct-2507'], + default: configData.license_models?.pro?.default || null, + cost_optimized: configData.license_models?.pro?.cost_optimized || null, + complex: configData.license_models?.pro?.complex || null, + blocked_models: configData.license_models?.pro?.blocked_models, }, enterprise: { - default: configData.license_models?.enterprise?.default || 'zai-org/GLM-5', - cost_optimized: configData.license_models?.enterprise?.cost_optimized || 'zai-org/GLM-5', - complex: configData.license_models?.enterprise?.complex || 'Qwen/Qwen3-235B-A22B-Instruct-2507', + default: configData.license_models?.enterprise?.default || null, + cost_optimized: configData.license_models?.enterprise?.cost_optimized || null, + complex: configData.license_models?.enterprise?.complex || null, }, }, }, @@ -354,6 +358,7 @@ try { icebergClient, relayClient: zmqRelayClient, logger: app.log, + getSymbolIndex: () => symbolIndexService, }); app.log.info('OHLC service initialized'); } catch (error) { diff --git a/gateway/src/services/ohlc-service.ts b/gateway/src/services/ohlc-service.ts index b7e5d1db..20c97514 100644 --- a/gateway/src/services/ohlc-service.ts +++ b/gateway/src/services/ohlc-service.ts @@ -28,12 +28,14 @@ import { backendToTradingView, DEFAULT_SUPPORTED_RESOLUTIONS, } from '../types/ohlc.js'; +import type { SymbolIndexService } from './symbol-index-service.js'; export interface OHLCServiceConfig { icebergClient: IcebergClient; relayClient: ZMQRelayClient; logger: FastifyBaseLogger; requestTimeout?: number; // Request timeout in ms (default: 30000) + getSymbolIndex?: () => SymbolIndexService | undefined; } /** @@ -45,11 +47,13 @@ export class OHLCService { private icebergClient: IcebergClient; private relayClient: ZMQRelayClient; private logger: FastifyBaseLogger; + private getSymbolIndex?: () => SymbolIndexService | undefined; constructor(config: OHLCServiceConfig) { this.icebergClient = config.icebergClient; this.relayClient = config.relayClient; this.logger = config.logger; + this.getSymbolIndex = config.getSymbolIndex; } /** @@ -129,7 +133,7 @@ export class OHLCService { if (missingRanges.length === 0 && data.length > 0) { // All data exists in Iceberg this.logger.info({ ticker, period_seconds, cached: true }, 'OHLC data found in cache, returning immediately'); - return this.formatHistoryResult(data, start_time, end_time, period_seconds, countback); + return this.formatHistoryResult(ticker, data, start_time, end_time, period_seconds, countback); } // Step 3: Request each missing range from the relay individually so we @@ -160,7 +164,7 @@ export class OHLCService { data = await this.icebergClient.queryOHLC(ticker, period_seconds, start_time, end_time); this.logger.info({ ticker, period_seconds, dataCount: data.length }, 'Final Iceberg query complete, returning result'); - return this.formatHistoryResult(data, start_time, end_time, period_seconds, countback); + return this.formatHistoryResult(ticker, data, start_time, end_time, period_seconds, countback); } catch (error: any) { this.logger.error({ @@ -179,8 +183,12 @@ export class OHLCService { * Interior gaps (confirmed trading periods with no trades) arrive as null-OHLC * rows from Iceberg. Edge gaps (data not yet ingested, in-progress candles) are * simply absent rows. Both are returned as-is; clients fill as appropriate. + * + * Applies decimal correction: Nautilus stores prices/volumes as integers; + * divide by 10^price_precision and 10^size_precision to recover float values. */ private formatHistoryResult( + ticker: string, data: any[], // @ts-ignore start_time: bigint, @@ -197,9 +205,33 @@ export class OHLCService { }; } - // Convert to TradingView format without null-filling missing slots. + // Convert to TradingView format (also converts BigInt fields to Number). let bars: TradingViewBar[] = data.map(backendToTradingView); + // Apply decimal correction using symbol metadata. + const symbolMeta = this.getSymbolIndex?.()?.getSymbolByTicker(ticker); + if (symbolMeta) { + const pricePrecision = symbolMeta.price_precision; + const sizePrecision = symbolMeta.size_precision; + if (pricePrecision != null && pricePrecision > 0) { + const priceDivisor = Math.pow(10, pricePrecision); + bars = bars.map(bar => ({ + ...bar, + open: bar.open / priceDivisor, + high: bar.high / priceDivisor, + low: bar.low / priceDivisor, + close: bar.close / priceDivisor, + })); + } + if (sizePrecision != null && sizePrecision > 0) { + const sizeDivisor = Math.pow(10, sizePrecision); + bars = bars.map(bar => ({ + ...bar, + volume: bar.volume != null ? bar.volume / sizeDivisor : bar.volume, + })); + } + } + bars.sort((a, b) => a.time - b.time); if (countback && bars.length > countback) { diff --git a/gateway/src/services/symbol-index-service.ts b/gateway/src/services/symbol-index-service.ts index e125e2b2..2dc713d0 100644 --- a/gateway/src/services/symbol-index-service.ts +++ b/gateway/src/services/symbol-index-service.ts @@ -91,6 +91,13 @@ export class SymbolIndexService { await this.initPromise; } + /** + * Look up symbol metadata by Nautilus ticker (e.g. "BTC/USDT.BINANCE") + */ + getSymbolByTicker(ticker: string): SymbolMetadata | undefined { + return this.symbols.get(ticker); + } + /** * Update or add a symbol to the index */ diff --git a/gateway/src/types/ohlc.ts b/gateway/src/types/ohlc.ts index f7695f19..803e0644 100644 --- a/gateway/src/types/ohlc.ts +++ b/gateway/src/types/ohlc.ts @@ -170,11 +170,11 @@ export function nanosToSeconds(nanos: bigint | number): number { export function backendToTradingView(backend: BackendOHLC): TradingViewBar { return { time: nanosToSeconds(backend.timestamp), - open: backend.open, - high: backend.high, - low: backend.low, - close: backend.close, - volume: backend.volume ?? undefined, + open: Number(backend.open), + high: Number(backend.high), + low: Number(backend.low), + close: Number(backend.close), + volume: backend.volume != null ? Number(backend.volume) : undefined, }; } diff --git a/ingestor/src/ccxt-fetcher.js b/ingestor/src/ccxt-fetcher.js index 57babf7f..e79e4f12 100644 --- a/ingestor/src/ccxt-fetcher.js +++ b/ingestor/src/ccxt-fetcher.js @@ -421,15 +421,79 @@ export class CCXTFetcher { const amount = Math.round(trade.amount * sizeMult); const quoteAmount = Math.round((trade.price * trade.amount) * priceMult); + // protobufjs v7 uses camelCase field names internally — must use camelCase here return { - trade_id: trade.id || `${trade.timestamp}`, + tradeId: trade.id || `${trade.timestamp}`, ticker, - timestamp: (trade.timestamp * 1_000_000).toString(), // Convert ms to nanoseconds - price: price.toString(), - amount: amount.toString(), - quote_amount: quoteAmount.toString(), - taker_buy: trade.side === 'buy', - sequence: trade.order ? trade.order.toString() : undefined + timestamp: (trade.timestamp * 1_000_000).toString(), // Convert ms to nanoseconds + price: price.toString(), + amount: amount.toString(), + quoteAmount: quoteAmount.toString(), + takerBuy: trade.side === 'buy', + sequence: trade.order ? trade.order.toString() : undefined + }; + } + + /** + * Fetch 1-minute bars covering the current open window for each configured period, + * rolling them up into a single aggregate per period for Flink accumulator seeding. + * + * Returns one seed object per period (or null for periods that just started with no + * completed 1m bars yet). Throws on exchange errors — caller handles retries. + * + * @param {string} ticker + * @param {number[]} periodSeconds - configured periods (e.g. [60, 300, 900, 3600, 14400, 86400]) + * @returns {Promise>} + */ + async fetchSeedCandles(ticker, periodSeconds) { + const nowMs = Date.now(); + const maxPeriod = Math.max(...periodSeconds); + const longestWindowStart = Math.floor(nowMs / (maxPeriod * 1000)) * (maxPeriod * 1000); + + // fetchHistoricalOHLC expects nanoseconds as strings + const startNs = (longestWindowStart * 1_000_000).toString(); + const endNs = (nowMs * 1_000_000).toString(); + + const bars1m = await this.fetchHistoricalOHLC(ticker, startNs, endNs, 60, null); + + return periodSeconds.map(period => { + const windowStart = Math.floor(nowMs / (period * 1000)) * (period * 1000); + const relevant = bars1m.filter(b => { + const tsMs = parseInt(b.timestamp) / 1_000_000; + return tsMs >= windowStart && tsMs < nowMs; + }); + if (relevant.length === 0) return null; + + const open = parseInt(relevant[0].open); + const high = Math.max(...relevant.map(b => parseInt(b.high))); + const low = Math.min(...relevant.map(b => parseInt(b.low))); + const close = parseInt(relevant[relevant.length - 1].close); + const volume = relevant.reduce((sum, b) => sum + parseInt(b.volume), 0); + + return { periodSeconds: period, open, high, low, close, volume, windowStartMs: windowStart }; + }); + } + + /** + * Convert a seed candle aggregate into a Tick-shaped object for Kafka. + * price = open (scaled int), amount = volume (scaled int); seed_* fields carry H/L/C/period. + */ + convertSeedToTick(seed, ticker) { + // protobufjs v7 uses camelCase field names internally — must use camelCase here + return { + tradeId: `seed-${ticker}-${seed.periodSeconds}-${seed.windowStartMs}`, + ticker, + timestamp: (seed.windowStartMs * 1_000_000).toString(), + price: seed.open, + amount: seed.volume, + quoteAmount: 0, + takerBuy: false, + isSeed: true, + seedHigh: seed.high, + seedLow: seed.low, + seedClose: seed.close, + seedWindowStartMs: seed.windowStartMs, + seedPeriodSeconds: seed.periodSeconds }; } diff --git a/ingestor/src/index.js b/ingestor/src/index.js index 3375982e..609e7e1e 100644 --- a/ingestor/src/index.js +++ b/ingestor/src/index.js @@ -332,7 +332,9 @@ class IngestorWorker { this.zmqClient.sendReject(jobId, 'Slot capacity exceeded').catch(() => {}); return; } - this.handleRealtimeRequest(request); + this.handleRealtimeRequest(request).catch(err => { + this.logger.error({ jobId, requestId, error: err.message }, 'Unexpected error in realtime handler'); + }); } else if (isTickerSnapshot) { if (!this.pool.consumeSlot(jobId, exchange, 'HISTORICAL')) { this.zmqClient.sendReject(jobId, 'Slot capacity exceeded').catch(() => {}); @@ -431,11 +433,40 @@ class IngestorWorker { /** * Start realtime tick polling for a job dispatched by Flink. + * Fetches seed candles first so Flink initializes the open-candle accumulator correctly. */ - handleRealtimeRequest(request) { + async handleRealtimeRequest(request) { const { jobId, requestId, ticker } = request; this.logger.info({ jobId, requestId, ticker }, 'Processing realtime subscription request'); + const periods = [60, 300, 900, 3600, 14400, 86400]; + const MAX_RETRIES = 3; + const RETRY_DELAY_MS = 5000; + let seeds = null; + + for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) { + try { + seeds = await this.ccxtFetcher.fetchSeedCandles(ticker, periods); + break; + } catch (err) { + this.logger.warn({ jobId, ticker, attempt, error: err.message }, 'Seed candle fetch failed'); + if (attempt < MAX_RETRIES) await new Promise(r => setTimeout(r, RETRY_DELAY_MS * attempt)); + } + } + + if (seeds !== null) { + const seedTicks = seeds + .filter(s => s !== null) + .map(s => this.ccxtFetcher.convertSeedToTick(s, ticker)); + if (seedTicks.length > 0) { + await this.kafkaProducer.writeTicks(this.config.kafka_tick_topic, seedTicks); + this.logger.info({ jobId, ticker, count: seedTicks.length }, 'Wrote seed ticks'); + } + } else { + // All retries exhausted — open bars suppressed for current partial window until next candle boundary + this.logger.error({ jobId, ticker }, 'All seed retries failed — open bars suppressed until next candle'); + } + this.activeRealtime.add(jobId); this.realtimePoller.startSubscription(jobId, requestId, ticker, this.config.kafka_tick_topic); } diff --git a/protobuf/tick.proto b/protobuf/tick.proto index 84caeab4..50730769 100644 --- a/protobuf/tick.proto +++ b/protobuf/tick.proto @@ -34,6 +34,15 @@ message Tick { // Additional flags for special trade types optional TradeFlags flags = 10; + + // When true: synthetic seed record carrying pre-aggregated OHLC for accumulator init. + // price = open (scaled), amount = volume (scaled); seed_* fields carry H/L/C/period. + optional bool is_seed = 11; + optional int64 seed_high = 12; + optional int64 seed_low = 13; + optional int64 seed_close = 14; + optional uint64 seed_window_start_ms = 15; + optional uint32 seed_period_seconds = 16; } message TradeFlags { diff --git a/sandbox/Dockerfile b/sandbox/Dockerfile index 6077a97c..6cf4d2a4 100644 --- a/sandbox/Dockerfile +++ b/sandbox/Dockerfile @@ -48,6 +48,7 @@ COPY --from=builder /build/env /opt/conda/envs/dexorder # Copy application code COPY dexorder/ /app/dexorder/ COPY main.py /app/ +COPY environment.yml /app/ # Copy generated protobuf code from builder COPY --from=builder /build/dexorder/generated/ /app/dexorder/generated/ diff --git a/sandbox/main.py b/sandbox/main.py index 452a076a..d4db22da 100644 --- a/sandbox/main.py +++ b/sandbox/main.py @@ -1156,15 +1156,23 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server meta_parts.append(f"error: {result['error']}") if result.get("revision"): meta_parts.append(f"revision: {result['revision']}") - if result.get("validation") and not result["validation"].get("success"): + if result.get("validation"): val = result["validation"] - error_detail = val.get('error') or '' - if val.get('output'): - error_detail = f"{error_detail}\n{val['output']}" if error_detail else val['output'] - meta_parts.append(f"validation error: {error_detail.strip()}") + if not val.get("success"): + error_detail = val.get('error') or '' + if val.get('output'): + error_detail = f"{error_detail}\n{val['output']}" if error_detail else val['output'] + meta_parts.append(f"validation error: {error_detail.strip()}") + elif val.get("output"): + # Always show output — may contain ⚠ WARNING for all-NaN / all-zero results + meta_parts.append(f"validation output: {val['output']}") content.append(TextContent(type="text", text="\n".join(meta_parts))) if result.get("execution"): - exec_content = result["execution"].get("content", []) + exec_result = result["execution"] + exec_content = exec_result.get("content", []) + if not exec_content and exec_result.get("output"): + # _execute_indicator returns plain {"output": str}, not MCP {"content": [...]} + exec_content = [TextContent(type="text", text=exec_result["output"])] content.extend(exec_content) image_count = sum(1 for item in exec_content if item.type == "image") logging.info(f"PythonWrite '{arguments.get('name')}': returning {len(content)} items, {image_count} images") @@ -1208,15 +1216,23 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server meta_parts.append(f"error: {result['error']}") if result.get("revision"): meta_parts.append(f"revision: {result['revision']}") - if result.get("validation") and not result["validation"].get("success"): + if result.get("validation"): val = result["validation"] - error_detail = val.get('error') or '' - if val.get('output'): - error_detail = f"{error_detail}\n{val['output']}" if error_detail else val['output'] - meta_parts.append(f"validation error: {error_detail.strip()}") + if not val.get("success"): + error_detail = val.get('error') or '' + if val.get('output'): + error_detail = f"{error_detail}\n{val['output']}" if error_detail else val['output'] + meta_parts.append(f"validation error: {error_detail.strip()}") + elif val.get("output"): + # Always show output — may contain ⚠ WARNING for all-NaN / all-zero results + meta_parts.append(f"validation output: {val['output']}") content.append(TextContent(type="text", text="\n".join(meta_parts))) if result.get("execution"): - exec_content = result["execution"].get("content", []) + exec_result = result["execution"] + exec_content = exec_result.get("content", []) + if not exec_content and exec_result.get("output"): + # _execute_indicator returns plain {"output": str}, not MCP {"content": [...]} + exec_content = [TextContent(type="text", text=exec_result["output"])] content.extend(exec_content) image_count = sum(1 for item in exec_content if item.type == "image") logging.info(f"PythonEdit '{arguments.get('name')}': returning {len(content)} items, {image_count} images") diff --git a/web/src/components/CategoryItemList.vue b/web/src/components/CategoryItemList.vue index d1530322..c633fa17 100644 --- a/web/src/components/CategoryItemList.vue +++ b/web/src/components/CategoryItemList.vue @@ -1,9 +1,13 @@ @@ -63,6 +91,7 @@ function onUpdated(_payload: { category: string; name: string; success: boolean; Spec Result Use + @@ -79,6 +108,21 @@ function onUpdated(_payload: { category: string; name: string; success: boolean; :name="editingName" @updated="onUpdated" /> + + + Delete {{ deletingRow?.display_name }}? This cannot be undone. + + + + + diff --git a/web/src/components/ChatPanel.vue b/web/src/components/ChatPanel.vue index cb0004d5..bd833f2e 100644 --- a/web/src/components/ChatPanel.vue +++ b/web/src/components/ChatPanel.vue @@ -41,6 +41,12 @@ const messages = ref([]) const messagesLoaded = ref(false) const isConnected = wsManager.isConnected +// Model tag state +const availableModelTags = ref([]) +const currentModelTag = ref(null) +// Content typed after @Tag, held until model_switched arrives to re-insert as first user message +let pendingModelSwitchContent = '' + // Reactive rooms that update based on WebSocket connection and agent processing state const rooms = computed(() => [{ roomId: SESSION_ID, @@ -48,7 +54,8 @@ const rooms = computed(() => [{ avatar: null, users: [ { _id: CURRENT_USER_ID, username: 'You' }, - { _id: AGENT_ID, username: 'AI Agent' } + { _id: AGENT_ID, username: 'AI Agent' }, + ...availableModelTags.value.map(tag => ({ _id: `model-${tag.toLowerCase().replace(/[^a-z0-9]/g, '-')}`, username: tag })), ], unreadCount: 0, typingUsers: isAgentProcessing.value ? [AGENT_ID] : [] @@ -153,6 +160,63 @@ const streamingImages = ref([]) const handleMessage = (data: WebSocketMessage) => { console.log('[ChatPanel] Received message:', data) + if (data.type === 'connected') { + if (Array.isArray(data.modelTags)) { + availableModelTags.value = data.modelTags + } + return + } + + if (data.type === 'model_switched') { + // Reset all streaming state from previous conversation + currentStreamingMessageId = null + toolCallMessageId = null + lastSentMessageId = null + streamingBuffer = '' + streamingImages.value = [] + subagentContentMap = new Map() + currentModelTag.value = data.tag ?? null + + const timestamp = new Date().toTimeString().split(' ')[0].slice(0, 5) + const date = new Date().toLocaleDateString() + + // System notice at the top of the new conversation + const systemMsg = { + _id: generateMessageId(), + content: `Switched to ${data.tag} — conversation history cleared`, + system: true, + timestamp, + date, + } + + const newMessages: any[] = [systemMsg] + + // Re-add the user's first message (content after the @Tag) + const userContent: string = pendingModelSwitchContent || data.rest || '' + if (userContent) { + const userMsgId = generateMessageId() + lastSentMessageId = userMsgId + newMessages.push({ + _id: userMsgId, + content: userContent, + senderId: CURRENT_USER_ID, + timestamp, + date, + saved: true, + distributed: true, + seen: false, + files: [], + }) + } + + messages.value = newMessages + pendingModelSwitchContent = '' + + // Show thinking bubble now that the message list is set up + addToolCallBubble('Thinking...') + return + } + if (data.type === 'conversation_history') { messages.value = (data.messages as any[]).map((m: any) => { const ts = new Date(m.timestamp / 1000) // microseconds → ms @@ -189,7 +253,7 @@ const handleMessage = (data: WebSocketMessage) => { return } - if (data.type === 'subagent_chunk') { + if (data.type === 'subagent_chunk' || data.type === 'subagent_thinking') { appendSubagentChunk(data.agentName, data.content) return } @@ -363,7 +427,17 @@ const sendMessage = async (event: any) => { // Extract data from CustomEvent.detail[0] const data = event.detail?.[0] || event - const content = data.content || '' + // Convert any model @-tags from vue-advanced-chat's encoding back to @TagName + const rawContent: string = data.content || '' + const usersTag: any[] = data.usersTag || [] + let content = rawContent + let modelSwitchUser: any = null + for (const user of usersTag) { + if (typeof user._id === 'string' && user._id.startsWith('model-')) { + content = content.replace(`${user._id}`, `@${user.username}`) + if (!modelSwitchUser) modelSwitchUser = user + } + } const files = data.files const roomId = data.roomId @@ -411,7 +485,24 @@ const sendMessage = async (event: any) => { } } - // Add user message to UI + // Send to backend via WebSocket + const wsMessage = { + type: 'agent_user_message', + session_id: roomId || SESSION_ID, + content: content, + attachments: attachments + } + + if (modelSwitchUser) { + // Model switch: store content-after-tag for model_switched handler to re-insert. + // Don't add a user message bubble now — model_switched will set up the full initial state. + pendingModelSwitchContent = content.replace(`@${modelSwitchUser.username}`, '').trim() + wsManager.send(wsMessage) + isAgentProcessing.value = true + return + } + + // Normal message: add to UI immediately const userMessage = { _id: messageId, content: content, @@ -425,14 +516,6 @@ const sendMessage = async (event: any) => { } messages.value = [...messages.value, userMessage] - - // Send to backend via WebSocket - const wsMessage = { - type: 'agent_user_message', - session_id: roomId || SESSION_ID, - content: content, - attachments: attachments - } wsManager.send(wsMessage) // Track this message so the agent_chunk handler can mark it seen @@ -692,8 +775,12 @@ onUnmounted(() => { {{ channelStore.statusMessage || 'Connecting...' }} + + + Model: {{ currentModelTag }} + + { + @@ -765,6 +853,20 @@ onUnmounted(() => { position: relative; } +.model-tag-badge { + position: absolute; + top: 6px; + right: 12px; + z-index: 10; + font-size: 0.7rem; + color: #089981; + background: rgba(8, 153, 129, 0.12); + border: 1px solid rgba(8, 153, 129, 0.3); + border-radius: 4px; + padding: 2px 7px; + pointer-events: none; +} + .workspace-loading { position: fixed; inset: 0; diff --git a/web/src/stores/indicatorTypes.ts b/web/src/stores/indicatorTypes.ts index 97fe32fa..48bd6b54 100644 --- a/web/src/stores/indicatorTypes.ts +++ b/web/src/stores/indicatorTypes.ts @@ -13,5 +13,6 @@ export interface CustomIndicatorType { export const useIndicatorTypesStore = defineStore('indicator_types', () => { const types = ref>({}) - return { types } + const removeType = (id: string) => { delete types.value[id] } + return { types, removeType } }) diff --git a/web/src/stores/researchTypes.ts b/web/src/stores/researchTypes.ts index d9a95f07..29ee6180 100644 --- a/web/src/stores/researchTypes.ts +++ b/web/src/stores/researchTypes.ts @@ -10,5 +10,6 @@ export interface ResearchType { export const useResearchTypesStore = defineStore('research_types', () => { const types = ref>({}) - return { types } + const removeType = (id: string) => { delete types.value[id] } + return { types, removeType } }) diff --git a/web/src/stores/strategyTypes.ts b/web/src/stores/strategyTypes.ts index fc2f65e8..71b64907 100644 --- a/web/src/stores/strategyTypes.ts +++ b/web/src/stores/strategyTypes.ts @@ -10,5 +10,6 @@ export interface StrategyType { export const useStrategyTypesStore = defineStore('strategy_types', () => { const types = ref>({}) - return { types } + const removeType = (id: string) => { delete types.value[id] } + return { types, removeType } })
Delete {{ deletingRow?.display_name }}? This cannot be undone.