From 319d81c41f34a45244b26e733d2b798dac1c39bf Mon Sep 17 00:00:00 2001 From: Tim Olson Date: Fri, 24 Apr 2026 20:43:42 -0400 Subject: [PATCH] data timeout fixes; research agent improvements --- bin/deploy | 66 ------ bin/dev | 6 +- deploy/k8s/base/gateway.yaml | 7 + deploy/k8s/base/network-policies.yaml | 4 + doc/test_prompt.md | 12 +- .../flink/iceberg/SchemaInitializer.java | 44 +++- .../publisher/OHLCBatchDeserializer.java | 8 +- .../flink/publisher/OHLCBatchWrapper.java | 86 ++++---- .../dexorder/flink/sink/IcebergOHLCSink.java | 21 +- gateway/knowledge/README.md | 11 +- gateway/knowledge/api-reference.md | 21 +- .../indicators/indicator-development.md | 4 + gateway/knowledge/pandas-ta-reference.md | 4 + gateway/knowledge/platform/mcp-integration.md | 4 + gateway/knowledge/platform/workspace.md | 4 + .../strategies/strategy-development.md | 4 + .../knowledge/trading/technical-analysis.md | 4 + gateway/knowledge/usage-examples.md | 19 +- gateway/prompt/agent-main.md | 6 +- gateway/prompt/agent-research.md | 66 +++--- gateway/prompt/agent-web-explore.md | 8 - gateway/src/clients/duckdb-client.ts | 17 +- gateway/src/clients/zmq-relay-client.ts | 4 +- gateway/src/harness/agent-harness.ts | 52 ++++- gateway/src/harness/spawn/wiki-loader.ts | 59 ++++- gateway/src/services/ohlc-service.ts | 2 +- gateway/src/services/symbol-index-service.ts | 19 +- gateway/src/tools/mcp/mcp-tool-wrapper.ts | 22 +- .../src/tools/platform/get-chart-data.tool.ts | 5 +- .../src/tools/platform/memory-lookup.tool.ts | 19 +- ingestor/src/ccxt-fetcher.js | 82 +++++-- ingestor/src/kafka-producer.js | 14 +- protobuf/ohlc.proto | 2 + sandbox/dexorder/impl/data_api_impl.py | 6 +- sandbox/dexorder/ohlc_client.py | 10 +- sandbox/main.py | 208 +++++++++++++++--- web/src/components/ChatPanel.vue | 22 ++ 37 files changed, 672 insertions(+), 280 deletions(-) diff --git a/bin/deploy b/bin/deploy index c7239ca2..1d366d0b 100755 --- a/bin/deploy +++ b/bin/deploy @@ -109,72 +109,6 @@ if [ "$PROJECT" != "lifecycle-sidecar" ]; then rsync -a --checksum --delete protobuf/ $PROJECT/protobuf/ fi -# For gateway: copy Python API files for research subagent -if [ "$PROJECT" == "gateway" ]; then - echo "Copying Python API files for research subagent..." - - # Create api-source directory - mkdir -p gateway/src/harness/subagents/research/api-source - - # Copy all Python API files (for easy future expansion) - cp sandbox/dexorder/api/*.py gateway/src/harness/subagents/research/api-source/ - - # Generate api-reference.md with verbatim Python source code - API_REF="gateway/src/harness/subagents/research/memory/api-reference.md" - - cat > "$API_REF" << 'HEADER' -# Dexorder Research API Reference - -This file contains the complete Python API source code with full docstrings. -These files are copied verbatim from `sandbox/dexorder/api/`. - -The API provides access to market data and charting capabilities for research scripts. - ---- - -## Overview - -Research scripts access the API via: -```python -from dexorder.api import get_api -api = get_api() -``` - -The API instance provides: -- `api.data` - DataAPI for fetching OHLC market data -- `api.charting` - ChartingAPI for creating financial charts - ---- - -## Complete API Source Code - -The following sections contain the verbatim Python source files with complete -type hints, docstrings, and examples. - -HEADER - - # Append each Python file - for py_file in api.py data_api.py charting_api.py __init__.py; do - if [ -f "sandbox/dexorder/api/$py_file" ]; then - echo "" >> "$API_REF" - echo "### $py_file" >> "$API_REF" - echo '```python' >> "$API_REF" - cat "sandbox/dexorder/api/$py_file" >> "$API_REF" - echo '```' >> "$API_REF" - echo "" >> "$API_REF" - fi - done - - cat >> "$API_REF" << 'FOOTER' - ---- - -For practical usage patterns and complete working examples, see `usage-examples.md`. -FOOTER - - echo "Generated api-reference.md with Python API source code" -fi - docker build $NO_CACHE -f $PROJECT/Dockerfile --build-arg="CONFIG=$CONFIG" --build-arg="DEPLOYMENT=$DEPLOYMENT" -t dexorder/ai-$PROJECT:latest $PROJECT || exit 1 # Cleanup is handled by trap diff --git a/bin/dev b/bin/dev index f2b841dd..454f316e 100755 --- a/bin/dev +++ b/bin/dev @@ -511,9 +511,9 @@ deep_restart() { # Force restart iceberg-catalog since it depends on postgres and minio echo -e "${GREEN}→${NC} Force restarting iceberg-catalog (depends on postgres/minio)..." kubectl delete pod -l app=iceberg-catalog 2>/dev/null || true - # Remove all sandbox deployments and services to free quota - echo -e "${GREEN}→${NC} Removing all sandbox deployments and services..." - kubectl delete deployments,services --all -n sandbox 2>/dev/null || true + # Remove all sandbox deployments, services, and PVCs to fully reset user state + echo -e "${GREEN}→${NC} Removing all sandbox deployments, services, and PVCs..." + kubectl delete deployments,services,pvc --all -n sandbox 2>/dev/null || true ;; *) echo -e "${RED}Error: Unknown service '$service'${NC}" diff --git a/deploy/k8s/base/gateway.yaml b/deploy/k8s/base/gateway.yaml index 068a0c21..ace3adc7 100644 --- a/deploy/k8s/base/gateway.yaml +++ b/deploy/k8s/base/gateway.yaml @@ -13,6 +13,10 @@ spec: protocol: TCP port: 3000 targetPort: http + - name: zmq-events + protocol: TCP + port: 5571 + targetPort: 5571 type: ClusterIP --- apiVersion: apps/v1 @@ -64,6 +68,9 @@ spec: - name: http containerPort: 3000 protocol: TCP + - name: zmq-events + containerPort: 5571 + protocol: TCP volumeMounts: - name: config diff --git a/deploy/k8s/base/network-policies.yaml b/deploy/k8s/base/network-policies.yaml index cd4b5609..1aa14025 100644 --- a/deploy/k8s/base/network-policies.yaml +++ b/deploy/k8s/base/network-policies.yaml @@ -69,6 +69,8 @@ spec: ports: - protocol: TCP port: 3000 + - protocol: TCP + port: 5571 # External HTTPS (for exchange APIs, LLM APIs) - to: - ipBlock: @@ -102,3 +104,5 @@ spec: ports: - protocol: TCP port: 3000 + - protocol: TCP + port: 5571 diff --git a/doc/test_prompt.md b/doc/test_prompt.md index 2d38df10..64353e14 100644 --- a/doc/test_prompt.md +++ b/doc/test_prompt.md @@ -1 +1,11 @@ -what conclusions can you make by analyzing historical data on ETH price direction changes near market session overlaps and market sessions changes on monday and tuesday? \ No newline at end of file +--- + +For the following series of analysis questions, use 5 years of 15 minute data from `ETH/USDT.BINANCE` + +what conclusions can you make by analyzing historical data on ETH price direction changes near market session overlaps and market sessions changes on monday and tuesday? + +--- + +do the same price direction change analysis but specifically compre a two hr range (1 hour before and after 9am EST, NY open; 11am to 1pm range, and 4:30 to 7pm EST range to look for potential price direction changes and compare relative probibilty for monday and tuesday for all other times between wednesday and sunday.. + +--- diff --git a/flink/src/main/java/com/dexorder/flink/iceberg/SchemaInitializer.java b/flink/src/main/java/com/dexorder/flink/iceberg/SchemaInitializer.java index bf02884c..f71c0e3c 100644 --- a/flink/src/main/java/com/dexorder/flink/iceberg/SchemaInitializer.java +++ b/flink/src/main/java/com/dexorder/flink/iceberg/SchemaInitializer.java @@ -123,10 +123,10 @@ public class SchemaInitializer { /** * Initialize the OHLC table if it doesn't exist. */ - // Bump this when the schema changes. Tables with a different (or missing) version - // will be dropped and recreated. Increment by 1 for each incompatible change. + // Bump this when the schema changes. Increment by 1 for each change. // v1: open/high/low/close required; ingestor forward-fills interior gaps with previous close - private static final String OHLC_SCHEMA_VERSION = "1"; + // v2: added num_trades and quote_volume (appended; backward-compatible via Iceberg schema evolution) + private static final String OHLC_SCHEMA_VERSION = "2"; private static final String SCHEMA_VERSION_PROP = "app.schema.version"; private void initializeOhlcTable() { @@ -154,11 +154,13 @@ public class SchemaInitializer { if (tableExists) { Table existing = catalog.loadTable(tableId); String existingVersion = existing.properties().get(SCHEMA_VERSION_PROP); + LOG.info("Table {} already exists at schema version {}", tableId, existingVersion); if (!OHLC_SCHEMA_VERSION.equals(existingVersion)) { - LOG.warn("Table {} has schema version '{}', expected '{}' — skipping (manual migration required if needed)", - tableId, existingVersion, OHLC_SCHEMA_VERSION); + LOG.info("Evolving table {} from version '{}' to '{}'", tableId, existingVersion, OHLC_SCHEMA_VERSION); + evolveOhlcSchema(existing); + existing.updateProperties().set(SCHEMA_VERSION_PROP, OHLC_SCHEMA_VERSION).commit(); + LOG.info("Schema evolution complete for {}", tableId); } - LOG.info("Table {} already exists at schema version {} — skipping creation", tableId, existingVersion); return; } @@ -195,7 +197,11 @@ public class SchemaInitializer { // Metadata fields optional(16, "request_id", Types.StringType.get(), "Request ID that generated this data"), - required(17, "ingested_at", Types.LongType.get(), "Timestamp when data was ingested by Flink (nanoseconds since epoch)") + required(17, "ingested_at", Types.LongType.get(), "Timestamp when data was ingested by Flink (nanoseconds since epoch)"), + + // Extended exchange fields — appended for backward-compatible schema evolution (v2) + optional(18, "num_trades", Types.LongType.get(), "Number of trades in the candle"), + optional(19, "quote_volume", Types.LongType.get(), "Total quote asset volume (scaled by price precision)") ); // Create the table with partitioning and properties @@ -218,6 +224,30 @@ public class SchemaInitializer { } } + /** + * Add any columns missing from a v1 OHLC table to bring it to v2. + * Iceberg schema evolution is safe and non-destructive — existing rows get null for new columns. + */ + private void evolveOhlcSchema(Table table) { + org.apache.iceberg.UpdateSchema update = table.updateSchema(); + boolean changed = false; + java.util.Set existing = new java.util.HashSet<>(); + for (org.apache.iceberg.types.Types.NestedField f : table.schema().columns()) { + existing.add(f.name()); + } + if (!existing.contains("num_trades")) { + update.addColumn("num_trades", Types.LongType.get(), "Number of trades in the candle"); + changed = true; + } + if (!existing.contains("quote_volume")) { + update.addColumn("quote_volume", Types.LongType.get(), "Total quote asset volume (scaled by price precision)"); + changed = true; + } + if (changed) { + update.commit(); + } + } + /** * Initialize the symbol_metadata table if it doesn't exist. */ diff --git a/flink/src/main/java/com/dexorder/flink/publisher/OHLCBatchDeserializer.java b/flink/src/main/java/com/dexorder/flink/publisher/OHLCBatchDeserializer.java index 45947c2c..c605eb66 100644 --- a/flink/src/main/java/com/dexorder/flink/publisher/OHLCBatchDeserializer.java +++ b/flink/src/main/java/com/dexorder/flink/publisher/OHLCBatchDeserializer.java @@ -69,7 +69,13 @@ public class OHLCBatchDeserializer implements DeserializationSchema **Rule**: Every research script must fetch the maximum useful history — target 100,000–200,000 bars, hard cap at 5 years. **Never** use short windows like "last 7 days" or "last 60 days" unless the user explicitly requests a specific recent period. @@ -49,36 +66,11 @@ Quick reference — approximate bars per resolution at various windows: **When to shorten the window**: only if 5 years at the chosen resolution would far exceed 200,000 bars (e.g., 5m over 5 years ≈ 525k → shorten to ~2 years). Otherwise always use the full 5 years. -## Available Tools +## Tool Behavior Notes -You have direct access to these MCP tools: - -- **PythonWrite**: Create a new script (research, strategy, or indicator category) - - Required: category, name, description, details, code - - Optional: metadata (category-specific fields — see below) - - **For research**: fully executes the script and returns all output (stdout, stderr) and captured chart images. The response IS the execution result — **do not call `ExecuteResearch` afterward**. - - **For indicator/strategy**: runs against synthetic test data to catch compile/runtime errors; no chart images are generated. - - Returns validation results and execution output (text + images for research) - -- **PythonEdit**: Update an existing script - - Required: category, name - - Optional: code, patches, description, details (full replacement), detail_patches (targeted text replacements in details), metadata - - **For research**: re-executes the script when code is changed and returns all output and images. **Do not call `ExecuteResearch` afterward**. - - **For indicator/strategy**: re-runs the validation test only. - - Returns validation results and execution output - -- **PythonRead**: Read an existing research script - - Returns: code, metadata - -- **PythonList**: List all research scripts - - Returns: array of {name, description, metadata} - -- **ExecuteResearch**: Run a research script that already exists on disk - - Use this **only** when the user explicitly asks to re-run a script, or to run a script that was written in a previous session and already exists - - **Do not call this after `PythonWrite` or `PythonEdit`** — those tools already executed the script and returned its output - - Returns: text output and images - -- **WebSearch**, **FetchPage**, **ArxivSearch**: Search the web or fetch pages for reference information when researching methodologies or indicators +- **`PythonWrite` / `PythonEdit` for research**: auto-executes the script and returns all output (stdout, stderr) and captured images. **Do not call `ExecuteResearch` afterward** — the script has already run. +- **`PythonWrite` / `PythonEdit` for indicator/strategy**: runs against synthetic test data only; no chart images are generated. +- **`ExecuteResearch`**: use **only** when the user explicitly asks to re-run a script, or to run one written in a previous session. Never call it after `PythonWrite` or `PythonEdit`. ## Research Script API @@ -109,6 +101,10 @@ When a user requests analysis: 2. **Use the provided name**: The instruction will begin with `Research script name: ""`. Always use that exact name when calling `PythonWrite` or `PythonEdit`. Check first with `PythonRead` — if the script already exists, use `PythonEdit` to update it rather than creating a new one with `PythonWrite`. + **One script per analysis idea**: If the name matches an existing script, the user is iterating on that idea — update it in place rather than creating a variant with a different name. Old versions are preserved in git history; there is no need to keep multiple scripts for variations of the same analysis. + + **Duplicate detection**: Also review the **Existing Research Scripts** list above. If a script already exists there that appears to cover the same analysis as your current instruction — even under a different name — note this in your response after completing the task, so the user can decide whether to consolidate. + 3. **Write the script**: Use `PythonWrite` (new) or `PythonEdit` (existing) - Write clean, well-commented Python code - Include proper error handling @@ -127,7 +123,17 @@ When a user requests analysis: - Use `PythonEdit` to fix the script - The script will auto-execute again -6. **Return results**: Once successful, summarize what was done +6. **Summarize findings**: After successful execution, update the research summary entry + using `ResearchSummaryPatch`: + - Replace the `**Findings:**` line(s) with 3–5 concise bullet points of key results + - Include only **statistically significant or practically notable** findings — + p-values, effect sizes, actionable patterns + - If nothing notable emerged: a single bullet `No significant patterns found` + - Keep the entire findings block under ~100 words; full output is always readable via + `PythonReadOutput(category="research", name="")` + - This applies after `PythonWrite`, `PythonEdit`, and `ExecuteResearch` runs + +7. **Return results**: Once successful, summarize what was done - The user will receive both your text response AND the chart images - Don't try to describe the images in detail - the user can see them diff --git a/gateway/prompt/agent-web-explore.md b/gateway/prompt/agent-web-explore.md index 74f5acda..0cf7ab6a 100644 --- a/gateway/prompt/agent-web-explore.md +++ b/gateway/prompt/agent-web-explore.md @@ -6,14 +6,6 @@ recursionLimit: 15 You are a research assistant that searches the web and academic databases to answer questions or gather information according to the given instructions. -## Tools - -You have three tools: - -- **`WebSearch`** — Search the web broadly (Tavily). Returns titles, URLs, and content summaries. Best for general information, news, documentation, proprietary/niche topics, trading indicators, software papers, and anything not likely to be on arXiv. -- **`ArxivSearch`** — Search arXiv for academic preprints. Returns titles, authors, abstracts, and PDF links. Use this **only** for peer-reviewed or academic research (e.g. machine learning, statistics, finance theory). Most trading indicators, technical analysis tools, and proprietary methods are NOT on arXiv. -- **`FetchPage`** — Fetch the full content of a URL (web page or PDF). PDFs are automatically converted to text. Use this after searching to read the complete content of a promising result. - ## Strategy 1. **Choose the right search tool first:** diff --git a/gateway/src/clients/duckdb-client.ts b/gateway/src/clients/duckdb-client.ts index 804b490f..034be948 100644 --- a/gateway/src/clients/duckdb-client.ts +++ b/gateway/src/clients/duckdb-client.ts @@ -44,6 +44,7 @@ export class DuckDBClient { private conversationsBucket?: string; private logger: FastifyBaseLogger; private initialized = false; + private initPromise: Promise | null = null; constructor(config: DuckDBConfig, logger: FastifyBaseLogger) { this.logger = logger; @@ -63,10 +64,16 @@ export class DuckDBClient { * Initialize DuckDB connection and configure S3/Iceberg extensions */ async initialize(): Promise { - if (this.initialized) { - return; + if (this.initialized) return; + if (!this.initPromise) { + this.initPromise = this._initialize().finally(() => { + this.initPromise = null; + }); } + await this.initPromise; + } + private async _initialize(): Promise { try { this.db = new Database(':memory:'); this.conn = this.db.connect(); @@ -409,10 +416,12 @@ export class DuckDBClient { // duplicate parquet files (e.g. from repeated Flink job runs on the same key // range) never produce more than one row per (ticker, period_seconds, timestamp). const sql = ` - SELECT timestamp, ticker, period_seconds, open, high, low, close, volume + SELECT timestamp, ticker, period_seconds, open, high, low, close, + volume, buy_vol, sell_vol, open_time, close_time, num_trades, quote_volume FROM ( SELECT - timestamp, ticker, period_seconds, open, high, low, close, volume, ingested_at, + timestamp, ticker, period_seconds, open, high, low, close, + volume, buy_vol, sell_vol, open_time, close_time, num_trades, quote_volume, ingested_at, ROW_NUMBER() OVER ( PARTITION BY timestamp ORDER BY ingested_at DESC diff --git a/gateway/src/clients/zmq-relay-client.ts b/gateway/src/clients/zmq-relay-client.ts index 84c8ca82..0b211de3 100644 --- a/gateway/src/clients/zmq-relay-client.ts +++ b/gateway/src/clients/zmq-relay-client.ts @@ -37,7 +37,7 @@ export interface ZMQRelayConfig { relayRequestEndpoint: string; // e.g., "tcp://relay:5559" relayNotificationEndpoint: string; // e.g., "tcp://relay:5558" clientId?: string; // Optional client ID, will generate if not provided - requestTimeout?: number; // Request timeout in ms (default: 60000) + requestTimeout?: number; // Request timeout in ms (default: 120000) onMetadataUpdate?: () => Promise; // Callback when symbol metadata updates } @@ -77,7 +77,7 @@ export class ZMQRelayClient { relayRequestEndpoint: config.relayRequestEndpoint, relayNotificationEndpoint: config.relayNotificationEndpoint, clientId: config.clientId || `gateway-${randomUUID().slice(0, 8)}`, - requestTimeout: config.requestTimeout || 60000, + requestTimeout: config.requestTimeout || 120000, onMetadataUpdate: config.onMetadataUpdate || (async () => {}), }; this.logger = logger; diff --git a/gateway/src/harness/agent-harness.ts b/gateway/src/harness/agent-harness.ts index 07844d72..5e486687 100644 --- a/gateway/src/harness/agent-harness.ts +++ b/gateway/src/harness/agent-harness.ts @@ -138,14 +138,29 @@ export class AgentHarness { } }); - // Register the user-preferences virtual wiki page (loaded fresh each turn) - this.wikiLoader.registerVirtual('user-preferences', async (ctx) => { + // Register existing research scripts as a virtual wiki page + this.wikiLoader.registerVirtual('research-scripts', async (ctx) => { + if (!ctx.mcpClient) return ''; + return this.fetchResearchScriptsSection(ctx.mcpClient); + }); + + this.registerFileDocument('user-preferences', 'PreferencesRead', 'User Preferences'); + this.registerFileDocument('research-summary', 'ResearchSummaryRead', 'Research Summary'); + } + + /** + * Register a virtual wiki page backed by a sandbox file-document tool. + * The tool must return { content: string, exists: boolean }. + * Pages are loaded fresh every turn (never cached). + */ + private registerFileDocument(virtualName: string, toolName: string, heading: string): void { + this.wikiLoader.registerVirtual(virtualName, async (ctx) => { if (!ctx.mcpClient) return ''; try { - const result = await ctx.mcpClient.callTool('PreferencesRead', {}); + const result = await ctx.mcpClient.callTool(toolName, {}); const parsed = JSON.parse(String(result)); if (!parsed.exists || !parsed.content?.trim()) return ''; - return `## User Preferences\n\n${parsed.content}`; + return `## ${heading}\n\n${parsed.content}`; } catch { return ''; } @@ -235,6 +250,35 @@ export class AgentHarness { } } + /** + * Fetch existing research scripts and return a formatted markdown section. + * Used as the virtual wiki page 'research-scripts'. + */ + private async fetchResearchScriptsSection(mcpClient: MCPClientConnector): Promise { + try { + const raw = await mcpClient.callTool('PythonList', { category: 'research' }); + const r = raw as any; + const text = r?.content?.[0]?.text ?? r?.[0]?.text; + const parsed = typeof text === 'string' ? JSON.parse(text) : raw; + const items: any[] = parsed?.items ?? []; + if (items.length === 0) return ''; + + const lines: string[] = ['## Existing Research Scripts', '']; + lines.push('The following research scripts already exist. Before creating a new one, check whether any of these already covers the requested analysis.\n'); + lines.push('| Name | Description |'); + lines.push('|------|-------------|'); + for (const item of items) { + const name: string = item.name ?? 'unknown'; + const desc: string = item.description ?? ''; + lines.push(`| ${name} | ${desc} |`); + } + return lines.join('\n'); + } catch (err) { + this.config.logger.warn({ err }, 'Failed to fetch research scripts for wiki page'); + return ''; + } + } + /** * Fetch custom indicators from the sandbox and return a formatted markdown section. * Used as the virtual wiki page 'custom-indicators'. diff --git a/gateway/src/harness/spawn/wiki-loader.ts b/gateway/src/harness/spawn/wiki-loader.ts index 4edcc1b2..5fb0af54 100644 --- a/gateway/src/harness/spawn/wiki-loader.ts +++ b/gateway/src/harness/spawn/wiki-loader.ts @@ -1,5 +1,5 @@ import { readFile, readdir } from 'fs/promises'; -import { join, dirname } from 'path'; +import { join, dirname, relative } from 'path'; import { fileURLToPath } from 'url'; import yaml from 'js-yaml'; import type { MCPClientConnector } from '../mcp-client.js'; @@ -16,6 +16,7 @@ const KNOWLEDGE_DIR = join(__dirname, '..', '..', '..', 'knowledge'); const PROMPT_DIR = join(__dirname, '..', '..', '..', 'prompt'); export interface WikiFrontmatter { + description?: string; maxTokens?: number; recursionLimit?: number; spawnsImages?: boolean; @@ -157,36 +158,74 @@ export class WikiLoader { } /** - * Return the base prompt text: prompt/index.md body + prompt/tools.md body, concatenated. + * Return the base prompt text: prompt/index.md body + prompt/tools.md body + KB catalog. * Result is cached for the lifetime of this WikiLoader instance. */ async getBasePrompt(): Promise { if (this.basePromptCache !== null) return this.basePromptCache; - const [index, tools] = await Promise.all([ + const [index, tools, catalog] = await Promise.all([ this.loadPromptPage('index'), this.loadPromptPage('tools'), + this.getKBCatalog(), ]); const parts: string[] = []; if (index) parts.push(index.body); if (tools) parts.push(tools.body); + parts.push(catalog); this.basePromptCache = parts.join('\n\n'); return this.basePromptCache; } /** * List all .md file names available in the knowledge directory (without extension). + * Returns paths relative to KNOWLEDGE_DIR, e.g. "api-reference", "platform/workspace". */ async listPages(): Promise { - try { - const files = await readdir(KNOWLEDGE_DIR); - return files - .filter(f => f.endsWith('.md')) - .map(f => f.slice(0, -3)); - } catch { - return []; + const results: string[] = []; + const scan = async (dir: string): Promise => { + let entries; + try { + entries = await readdir(dir, { withFileTypes: true }); + } catch { + return; + } + for (const entry of entries) { + const full = join(dir, entry.name); + if (entry.isDirectory()) { + await scan(full); + } else if (entry.isFile() && entry.name.endsWith('.md')) { + const rel = relative(KNOWLEDGE_DIR, full); + results.push(rel.slice(0, -3)); // strip .md + } + } + }; + await scan(KNOWLEDGE_DIR); + return results.sort(); + } + + /** + * Build a markdown catalog table of all KB pages with their descriptions. + * Used by getBasePrompt() to inject into the cached system prompt. + */ + async getKBCatalog(): Promise { + const pages = await this.listPages(); + const rows: string[] = []; + for (const name of pages) { + const page = await this.loadPage(name); + const desc = page?.frontmatter.description ?? ''; + rows.push(`| ${name} | ${desc} |`); } + return [ + '## Knowledge Base', + '', + 'The following reference articles are available via `MemoryLookup({page: "..."})`. Read an article when you need detailed reference information.', + '', + '| Page | Description |', + '|------|-------------|', + ...rows, + ].join('\n'); } /** diff --git a/gateway/src/services/ohlc-service.ts b/gateway/src/services/ohlc-service.ts index 0590affb..5cbe0f5d 100644 --- a/gateway/src/services/ohlc-service.ts +++ b/gateway/src/services/ohlc-service.ts @@ -277,7 +277,7 @@ export class OHLCService { pricescale: 100, has_intraday: true, has_daily: true, - has_weekly_and_monthly: false, + has_weekly_and_monthly: true, supported_resolutions: DEFAULT_SUPPORTED_RESOLUTIONS, data_status: 'streaming', }; diff --git a/gateway/src/services/symbol-index-service.ts b/gateway/src/services/symbol-index-service.ts index 4ef4abf9..2ccd3e48 100644 --- a/gateway/src/services/symbol-index-service.ts +++ b/gateway/src/services/symbol-index-service.ts @@ -25,6 +25,7 @@ export class SymbolIndexService { private logger: FastifyBaseLogger; private symbols: Map = new Map(); // key: "MARKET_ID.EXCHANGE" (Nautilus format) private initialized: boolean = false; + private initPromise: Promise | null = null; constructor(config: SymbolIndexServiceConfig) { this.icebergClient = config.icebergClient; @@ -57,7 +58,9 @@ export class SymbolIndexService { this.symbols.set(key, symbol); } - this.initialized = true; + if (symbols.length > 0) { + this.initialized = true; + } this.logger.info({ count: this.symbols.size, totalRows: symbols.length, @@ -74,12 +77,14 @@ export class SymbolIndexService { * Ensure index is initialized (with retry on failure) */ private async ensureInitialized(): Promise { - if (this.initialized) { - return; + if (this.initialized) return; + if (!this.initPromise) { + this.logger.info('Lazy-loading symbol index'); + this.initPromise = this.initialize().finally(() => { + this.initPromise = null; + }); } - - this.logger.info('Lazy-loading symbol index'); - await this.initialize(); + await this.initPromise; } /** @@ -213,7 +218,7 @@ export class SymbolIndexService { supported_resolutions: supportedResolutions.length > 0 ? supportedResolutions : DEFAULT_SUPPORTED_RESOLUTIONS, has_intraday: true, has_daily: true, - has_weekly_and_monthly: false, + has_weekly_and_monthly: true, pricescale, minmov: 1, base_currency: metadata.base_asset, diff --git a/gateway/src/tools/mcp/mcp-tool-wrapper.ts b/gateway/src/tools/mcp/mcp-tool-wrapper.ts index 473ddebd..002d7b24 100644 --- a/gateway/src/tools/mcp/mcp-tool-wrapper.ts +++ b/gateway/src/tools/mcp/mcp-tool-wrapper.ts @@ -125,6 +125,14 @@ export function createMCPToolWrapper( }); } +/** Silently parse a JSON string; pass non-strings through unchanged. */ +function tryParseJson(val: unknown): unknown { + if (typeof val === 'string') { + try { return JSON.parse(val); } catch { /* fall through */ } + } + return val; +} + /** * Convert MCP input schema to Zod schema */ @@ -156,17 +164,13 @@ function mcpInputSchemaToZod(inputSchema?: MCPToolInfo['inputSchema']): z.ZodObj case 'boolean': zodType = z.boolean().describe(prop.description || ''); break; - case 'array': - // Handle array items - if (prop.items) { - const itemType = getZodTypeForProperty(prop.items); - zodType = z.array(itemType).describe(prop.description || ''); - } else { - zodType = z.array(z.any()).describe(prop.description || ''); - } + case 'array': { + const itemType = prop.items ? getZodTypeForProperty(prop.items) : z.any(); + zodType = z.preprocess(tryParseJson, z.array(itemType)).describe(prop.description || ''); break; + } case 'object': - zodType = z.object({}).passthrough().describe(prop.description || ''); + zodType = z.preprocess(tryParseJson, z.object({}).passthrough()).describe(prop.description || ''); break; default: zodType = z.any().describe(prop.description || ''); diff --git a/gateway/src/tools/platform/get-chart-data.tool.ts b/gateway/src/tools/platform/get-chart-data.tool.ts index feca6b95..5b85c017 100644 --- a/gateway/src/tools/platform/get-chart-data.tool.ts +++ b/gateway/src/tools/platform/get-chart-data.tool.ts @@ -43,7 +43,10 @@ Parameters: from_time: z.union([z.number(), z.string()]).optional().describe('Start time: Unix seconds OR date string (defaults to workspace chartState.start_time)'), to_time: z.union([z.number(), z.string()]).optional().describe('End time: Unix seconds OR date string (defaults to workspace chartState.end_time)'), countback: z.number().optional().describe('Limit number of bars returned (max 500)'), - columns: z.array(z.enum(['volume', 'buy_vol', 'sell_vol', 'open_time', 'high_time', 'low_time', 'close_time', 'open_interest'])).optional().describe('Extra columns beyond OHLC'), + columns: z.preprocess( + (val) => { if (typeof val === 'string') { try { return JSON.parse(val); } catch { /* fall through */ } } return val; }, + z.array(z.enum(['volume', 'buy_vol', 'sell_vol', 'open_time', 'high_time', 'low_time', 'close_time', 'open_interest'])).optional() + ).describe('Extra columns beyond OHLC'), }), func: async ({ ticker, period, from_time, to_time, countback, columns }) => { const MAX_BARS = 500; diff --git a/gateway/src/tools/platform/memory-lookup.tool.ts b/gateway/src/tools/platform/memory-lookup.tool.ts index c8b6f951..5788f1af 100644 --- a/gateway/src/tools/platform/memory-lookup.tool.ts +++ b/gateway/src/tools/platform/memory-lookup.tool.ts @@ -19,31 +19,18 @@ export function createMemoryLookupTool(config: MemoryLookupToolConfig): DynamicS return new DynamicStructuredTool({ name: 'MemoryLookup', - description: `Read a knowledge wiki page by name to get detailed reference information. - -Pass "index" to list all available pages. - -Example pages: -- "api-reference" — DataAPI and ChartingAPI reference for research scripts -- "usage-examples" — Example research scripts -- "pandas-ta-reference" — Full pandas-ta indicator catalog`, + description: `Read a knowledge wiki page by name to get detailed reference information. Available pages are listed in the Knowledge Base section of your system prompt.`, schema: z.object({ page: z.string().describe( - 'Wiki page name to read (without .md extension). Pass "index" to list all pages.' + 'Wiki page name to read (without .md extension), e.g. "api-reference" or "platform/workspace".' ), }), func: async ({ page }: { page: string }): Promise => { logger.info({ page }, 'memory_lookup: reading page'); - if (page === 'index') { - const pages = await wikiLoader.listPages(); - return `Available wiki pages:\n${pages.map(p => `- ${p}`).join('\n')}`; - } - const wikiPage = await wikiLoader.loadPage(page); if (!wikiPage) { - const pages = await wikiLoader.listPages(); - return `Page "${page}" not found.\n\nAvailable pages:\n${pages.map(p => `- ${p}`).join('\n')}`; + return `Page "${page}" not found. Refer to the Knowledge Base table in your system prompt for available page names.`; } return wikiPage.body; diff --git a/ingestor/src/ccxt-fetcher.js b/ingestor/src/ccxt-fetcher.js index 10234e90..912e70ea 100644 --- a/ingestor/src/ccxt-fetcher.js +++ b/ingestor/src/ccxt-fetcher.js @@ -156,12 +156,32 @@ export class CCXTFetcher { const FETCH_RETRIES = 3; const FETCH_RETRY_DELAY_MS = 5000; + // Binance provides extended kline data (buy/sell volume split, quote volume, trade count). + // We use the raw klines endpoint directly to capture all available fields. + const isBinance = exchangeName === 'binance'; + let binanceMarketId = null; + if (isBinance) { + if (!exchange.markets || Object.keys(exchange.markets).length === 0) { + await exchange.loadMarkets(); + } + binanceMarketId = exchange.market(symbol).id; + } + while (since < endMs) { let candles; let lastError; for (let attempt = 1; attempt <= FETCH_RETRIES; attempt++) { try { - candles = await exchange.fetchOHLCV(symbol, timeframe, since, PAGE_SIZE); + if (isBinance) { + candles = await exchange.publicGetKlines({ + symbol: binanceMarketId, + interval: timeframe, + startTime: since, + limit: PAGE_SIZE + }); + } else { + candles = await exchange.fetchOHLCV(symbol, timeframe, since, PAGE_SIZE); + } lastError = null; break; } catch (error) { @@ -267,7 +287,7 @@ export class CCXTFetcher { } else if (prevClose !== null) { // Interior gap — forward-fill with previous close, zero volume gapCount++; - allCandles.push({ + const gapBar = { ticker, timestamp: (ts * 1_000_000).toString(), open: prevClose, @@ -277,7 +297,14 @@ export class CCXTFetcher { volume: '0', open_time: (ts * 1_000_000).toString(), close_time: ((ts + periodSeconds * 1000) * 1_000_000).toString() - }); + }; + if (isBinance) { + gapBar.buy_vol = '0'; + gapBar.sell_vol = '0'; + gapBar.num_trades = '0'; + gapBar.quote_volume = '0'; + } + allCandles.push(gapBar); } } @@ -332,27 +359,54 @@ export class CCXTFetcher { } /** - * Convert CCXT OHLCV array to our OHLC format - * CCXT format: [timestamp, open, high, low, close, volume] - * Uses precision fields from market metadata for proper integer representation + * Convert OHLCV array to our OHLC format. + * + * Accepts two formats: + * - Standard CCXT (6 elements): [timestamp, open, high, low, close, volume] + * - Binance raw klines (12 elements): [openTime, open, high, low, close, baseVolume, + * closeTime, quoteVolume, numTrades, takerBuyBaseVol, takerBuyQuoteVol, ignore] + * + * Prices/volumes use integer representation scaled by market metadata precision. */ convertToOHLC(candle, ticker, periodSeconds, metadata) { - const [timestamp, open, high, low, close, volume] = candle; + const timestamp = Number(candle[0]); + const open = parseFloat(candle[1]); + const high = parseFloat(candle[2]); + const low = parseFloat(candle[3]); + const close = parseFloat(candle[4]); + const volume = parseFloat(candle[5]); const priceMult = Math.pow(10, metadata.pricePrecision ?? 2); - const sizeMult = Math.pow(10, metadata.sizePrecision ?? 8); + const sizeMult = Math.pow(10, metadata.sizePrecision ?? 8); - return { + const result = { ticker, - timestamp: (timestamp * 1_000_000).toString(), // Convert ms to nanoseconds - open: Math.round(open * priceMult).toString(), - high: Math.round(high * priceMult).toString(), - low: Math.round(low * priceMult).toString(), + timestamp: (timestamp * 1_000_000).toString(), + open: Math.round(open * priceMult).toString(), + high: Math.round(high * priceMult).toString(), + low: Math.round(low * priceMult).toString(), close: Math.round(close * priceMult).toString(), volume: Math.round(volume * sizeMult).toString(), open_time: (timestamp * 1_000_000).toString(), - close_time: ((timestamp + periodSeconds * 1000) * 1_000_000).toString() }; + + if (candle.length >= 10) { + // Binance extended klines format + const closeTimeMs = Number(candle[6]); + const quoteVolRaw = parseFloat(candle[7]); + const numTrades = Number(candle[8]); + const takerBuyBase = parseFloat(candle[9]); + + result.close_time = (closeTimeMs * 1_000_000).toString(); + result.quote_volume = Math.round(quoteVolRaw * priceMult).toString(); + result.num_trades = numTrades.toString(); + result.buy_vol = Math.round(takerBuyBase * sizeMult).toString(); + result.sell_vol = Math.round((volume - takerBuyBase) * sizeMult).toString(); + } else { + result.close_time = ((timestamp + periodSeconds * 1000) * 1_000_000).toString(); + } + + return result; } /** diff --git a/ingestor/src/kafka-producer.js b/ingestor/src/kafka-producer.js index 6bf28d99..23f79fa2 100644 --- a/ingestor/src/kafka-producer.js +++ b/ingestor/src/kafka-producer.js @@ -147,6 +147,12 @@ export class KafkaProducer { close: candle.close, volume: candle.volume }; + if (candle.buy_vol != null) protoCandle.buy_vol = candle.buy_vol; + if (candle.sell_vol != null) protoCandle.sell_vol = candle.sell_vol; + if (candle.open_time != null) protoCandle.open_time = candle.open_time; + if (candle.close_time != null) protoCandle.close_time = candle.close_time; + if (candle.num_trades != null) protoCandle.num_trades = candle.num_trades; + if (candle.quote_volume != null) protoCandle.quote_volume = candle.quote_volume; const [frame1, frame2] = encodeMessage(MessageTypeId.OHLC, protoCandle, OHLC); const value = Buffer.concat([frame1, frame2]); @@ -188,7 +194,13 @@ export class KafkaProducer { low: candle.low, close: candle.close, }; - if (candle.volume != null) row.volume = candle.volume; + if (candle.volume != null) row.volume = candle.volume; + if (candle.buy_vol != null) row.buy_vol = candle.buy_vol; + if (candle.sell_vol != null) row.sell_vol = candle.sell_vol; + if (candle.open_time != null) row.open_time = candle.open_time; + if (candle.close_time != null) row.close_time = candle.close_time; + if (candle.num_trades != null) row.num_trades = candle.num_trades; + if (candle.quote_volume != null) row.quote_volume = candle.quote_volume; return row; }) }; diff --git a/protobuf/ohlc.proto b/protobuf/ohlc.proto index ef8dc8b5..9341f61c 100644 --- a/protobuf/ohlc.proto +++ b/protobuf/ohlc.proto @@ -23,6 +23,8 @@ message OHLC { optional int64 close_time = 12; optional int64 open_interest = 13; string ticker = 14; // Nautilus format: "BTC/USDT.BINANCE" + optional int64 num_trades = 15; // Number of trades in the candle + optional int64 quote_volume = 16; // Total quote asset volume (scaled by price precision) } // Batch of OHLC rows with metadata for historical request tracking diff --git a/sandbox/dexorder/impl/data_api_impl.py b/sandbox/dexorder/impl/data_api_impl.py index f60cb1f9..73a83ebe 100644 --- a/sandbox/dexorder/impl/data_api_impl.py +++ b/sandbox/dexorder/impl/data_api_impl.py @@ -20,6 +20,7 @@ OHLC_OPTIONAL_COLUMNS = [ "volume", "buy_vol", "sell_vol", "open_time", "high_time", "low_time", "close_time", "open_interest", + "num_trades", "quote_volume", ] # All valid extra columns available in the Iceberg schema @@ -27,6 +28,7 @@ VALID_EXTRA_COLUMNS = { "volume", "buy_vol", "sell_vol", "open_time", "high_time", "low_time", "close_time", "open_interest", + "num_trades", "quote_volume", "ticker", "period_seconds" } @@ -51,7 +53,7 @@ class DataAPIImpl(DataAPI): s3_access_key: Optional[str] = None, s3_secret_key: Optional[str] = None, s3_region: Optional[str] = None, - request_timeout: float = 30.0, + request_timeout: float = 120.0, ): """ Initialize DataAPI implementation. @@ -65,7 +67,7 @@ class DataAPIImpl(DataAPI): s3_access_key: S3/MinIO access key s3_secret_key: S3/MinIO secret key s3_region: S3/MinIO region (e.g., "us-east-1") - request_timeout: Default timeout for historical data requests in seconds (default: 30) + request_timeout: Default timeout for historical data requests in seconds (default: 120) """ self.ohlc_client = OHLCClient( iceberg_catalog_uri=iceberg_catalog_uri, diff --git a/sandbox/dexorder/ohlc_client.py b/sandbox/dexorder/ohlc_client.py index 3541eb69..84f98811 100644 --- a/sandbox/dexorder/ohlc_client.py +++ b/sandbox/dexorder/ohlc_client.py @@ -90,7 +90,7 @@ class OHLCClient: period_seconds: int, start_time: int, end_time: int, - request_timeout: float = 30.0 + request_timeout: float = 120.0 ) -> pd.DataFrame: """ Fetch OHLC data with smart caching. @@ -108,7 +108,7 @@ class OHLCClient: period_seconds: OHLC period in seconds (60, 300, 3600, etc.) start_time: Start timestamp in nanoseconds end_time: End timestamp in nanoseconds - request_timeout: Timeout for historical data requests (default: 30s) + request_timeout: Timeout for historical data requests (default: 120s) Returns: DataFrame with OHLC data sorted by timestamp @@ -180,6 +180,12 @@ class OHLCClient: if col in df.columns: df[col] = df[col] / size_divisor + if price_precision is not None and price_precision > 0: + price_divisor = 10 ** price_precision + for col in ("quote_volume",): + if col in df.columns: + df[col] = df[col] / price_divisor + return df async def __aenter__(self): diff --git a/sandbox/main.py b/sandbox/main.py index 28fd7d2b..e2f0b7a0 100644 --- a/sandbox/main.py +++ b/sandbox/main.py @@ -197,6 +197,73 @@ def _get_env_yml() -> Optional[Path]: return p if p.exists() else None +def _coerce_json_arg(val, expected_type: str): + """Coerce a possibly-stringified JSON argument to its expected Python type. + Handles LLMs that serialize structured arguments as JSON strings. + expected_type: 'object' → dict | 'array' → list + Returns None if val is None or coercion is not possible. + """ + if val is None: + return None + target = dict if expected_type == "object" else list + if isinstance(val, target): + return val + if isinstance(val, str): + try: + parsed = json.loads(val) + return parsed if isinstance(parsed, target) else None + except (ValueError, TypeError): + return None + return None + + +def _update_research_summary(data_dir: Path, script_name: str, description: str, text_output: str) -> None: + """ + Upsert the research-summary.md entry for the given script name. + Uses HTML comment anchors ( / ) to locate entries. + New entries get a stub with a findings placeholder; existing entries only have their + Last Run (and optionally Description) updated — agent-written findings are preserved. + """ + import re + from datetime import datetime, timezone + summary_path = data_dir / "research-summary.md" + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + begin_marker = f"" + end_marker = f"" + + stub_parts = [begin_marker, f"## {script_name}"] + if description: + stub_parts.append(f"**Description:** {description}") + stub_parts.append(f"**Last Run:** {timestamp}") + stub_parts.append("") + stub_parts.append("**Findings:** *(awaiting agent summary)*") + stub_parts.append(end_marker) + stub_entry = "\n".join(stub_parts) + + if not summary_path.exists(): + summary_path.write_text(f"# Research Summary\n\n{stub_entry}\n", encoding="utf-8") + return + + content = summary_path.read_text(encoding="utf-8") + begin_idx = content.find(begin_marker) + end_idx = content.find(end_marker) + + if begin_idx != -1 and end_idx != -1: + # Entry exists — update only Last Run (and Description if provided), preserve findings + existing = content[begin_idx : end_idx + len(end_marker)] + updated = re.sub(r'\*\*Last Run:\*\* [^\n]*', f'**Last Run:** {timestamp}', existing) + if description: + if '**Description:**' in updated: + updated = re.sub(r'\*\*Description:\*\* [^\n]*', f'**Description:** {description}', updated) + else: + updated = re.sub(r'(## [^\n]*\n)', f'\\1**Description:** {description}\n', updated, count=1) + new_content = content[:begin_idx] + updated + content[end_idx + len(end_marker):] + summary_path.write_text(new_content, encoding="utf-8") + else: + summary_path.write_text(content.rstrip() + "\n\n---\n\n" + stub_entry + "\n", encoding="utf-8") + + # ============================================================================= # Configuration @@ -398,7 +465,6 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server "description": "Name of the store" }, "patch": { - "type": "array", "description": "JSON Patch operations (RFC 6902)", "items": { "type": "object", @@ -454,6 +520,46 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server "required": ["old_str", "new_str"] } ), + Tool( + name="ResearchSummaryRead", + description="Read the research summary markdown file. Returns the full content of research-summary.md from the user's sandbox data directory.", + inputSchema={ + "type": "object", + "properties": {} + } + ), + Tool( + name="ResearchSummaryWrite", + description="Write (fully replace) the research summary markdown file.", + inputSchema={ + "type": "object", + "properties": { + "content": { + "type": "string", + "description": "Full markdown content for the research summary file" + } + }, + "required": ["content"] + } + ), + Tool( + name="ResearchSummaryPatch", + description="Surgically update a section of the research summary markdown file by finding and replacing text. Fails if old_str is not found.", + inputSchema={ + "type": "object", + "properties": { + "old_str": { + "type": "string", + "description": "Exact text to find in the research summary file" + }, + "new_str": { + "type": "string", + "description": "Replacement text" + } + }, + "required": ["old_str", "new_str"] + } + ), Tool( name="PythonWrite", description="Write a new strategy, indicator, or research script with validation", @@ -488,7 +594,6 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server "description": "Python implementation code" }, "metadata": { - "type": "object", "description": ( "Optional category-specific metadata. " "For strategy: include 'data_feeds' (list of {symbol, period_seconds, description}) " @@ -527,7 +632,6 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server "description": "Full replacement Python code. Use only when rewriting the entire implementation; prefer 'patches' for targeted edits." }, "patches": { - "type": "array", "description": ( "Targeted code edits as old/new string pairs. Preferred over 'code' for small changes. " "Each patch: {\"old_string\": \"exact text to find\", \"new_string\": \"replacement text\"}. " @@ -552,7 +656,6 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server "description": "Full replacement for the details field. Use only when rewriting the entire description; prefer 'detail_patches' for targeted edits." }, "detail_patches": { - "type": "array", "description": ( "Targeted edits to the details field as old/new string pairs. Preferred over 'details' for small changes. " "Each patch: {\"old_string\": \"exact text to find\", \"new_string\": \"replacement text\"}. " @@ -569,7 +672,6 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server } }, "metadata": { - "type": "object", "description": ( "Updated metadata fields (optional). " "For strategy: 'data_feeds' (list of {symbol, period_seconds, description}) " @@ -621,8 +723,6 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server "description": "Display name of the item" }, "files": { - "type": "array", - "items": {"type": "string"}, "description": ( "Specific filenames under output/ to return (e.g. [\"analysis.md\", \"img1.png\"]). " "If omitted, returns all output files listed in metadata." @@ -729,8 +829,6 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server "type": "object", "properties": { "packages": { - "type": "array", - "items": {"type": "string"}, "description": "List of conda package names to install" } }, @@ -781,7 +879,6 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server "description": "Lowercase pandas-ta function name, e.g. 'rsi', 'macd', 'bbands'" }, "parameters": { - "type": "object", "description": "pandas-ta keyword arguments, e.g. {\"length\": 14} or {\"fast\": 12, \"slow\": 26, \"signal\": 9}", "default": {} } @@ -806,7 +903,6 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server "description": "Display name of the strategy as saved via PythonWrite" }, "feeds": { - "type": "array", "description": "Data feeds to backtest against", "items": { "type": "object", @@ -861,7 +957,6 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server "description": "Display name of the strategy as saved via PythonWrite" }, "feeds": { - "type": "array", "description": "Data feeds for the strategy", "items": { "type": "object", @@ -1004,7 +1099,7 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server elif name == "WorkspacePatch": return workspace_store.patch( arguments.get("store_name", ""), - arguments.get("patch", []) + _coerce_json_arg(arguments.get("patch"), "array") or [] ) elif name == "PreferencesRead": prefs_path = DATA_DIR / "preferences.md" @@ -1025,6 +1120,25 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server return {"success": False, "error": "old_str not found in preferences file"} prefs_path.write_text(content.replace(old_str, new_str, 1), encoding="utf-8") return {"success": True} + elif name == "ResearchSummaryRead": + summary_path = DATA_DIR / "research-summary.md" + if not summary_path.exists(): + return {"content": "", "exists": False} + content = summary_path.read_text(encoding="utf-8") + return {"content": content, "exists": True} + elif name == "ResearchSummaryWrite": + summary_path = DATA_DIR / "research-summary.md" + summary_path.write_text(arguments.get("content", ""), encoding="utf-8") + return {"success": True} + elif name == "ResearchSummaryPatch": + summary_path = DATA_DIR / "research-summary.md" + old_str = arguments.get("old_str", "") + new_str = arguments.get("new_str", "") + content = summary_path.read_text(encoding="utf-8") if summary_path.exists() else "" + if old_str not in content: + return {"success": False, "error": "old_str not found in research summary file"} + summary_path.write_text(content.replace(old_str, new_str, 1), encoding="utf-8") + return {"success": True} elif name == "PythonWrite": result = await category_manager.write( category=arguments.get("category", ""), @@ -1032,7 +1146,7 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server description=arguments.get("description", ""), details=arguments.get("details", ""), code=arguments.get("code", ""), - metadata=arguments.get("metadata") + metadata=_coerce_json_arg(arguments.get("metadata"), "object") ) content = [] meta_parts = [f"success: {result['success']}"] @@ -1043,7 +1157,11 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server if result.get("revision"): meta_parts.append(f"revision: {result['revision']}") if result.get("validation") and not result["validation"].get("success"): - meta_parts.append(f"validation errors: {result['validation'].get('errors', [])}") + val = result["validation"] + error_detail = val.get('error') or '' + if val.get('output'): + error_detail = f"{error_detail}\n{val['output']}" if error_detail else val['output'] + meta_parts.append(f"validation error: {error_detail.strip()}") content.append(TextContent(type="text", text="\n".join(meta_parts))) if result.get("execution"): exec_content = result["execution"].get("content", []) @@ -1058,17 +1176,29 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server sync = _workspace_sync_content(workspace_store, arguments.get("category", "")) if sync: content.append(sync) + if arguments.get("category") == "research" and result.get("execution"): + exec_text = "\n".join( + item.text for item in result["execution"].get("content", []) + if getattr(item, "type", "") == "text" + ) + if exec_text.strip(): + _update_research_summary( + DATA_DIR, + arguments.get("name", ""), + arguments.get("description", "") or "", + exec_text, + ) return content elif name == "PythonEdit": result = await category_manager.edit( category=arguments.get("category", ""), name=arguments.get("name", ""), code=arguments.get("code"), - patches=arguments.get("patches"), + patches=_coerce_json_arg(arguments.get("patches"), "array"), description=arguments.get("description"), details=arguments.get("details"), - detail_patches=arguments.get("detail_patches"), - metadata=arguments.get("metadata") + detail_patches=_coerce_json_arg(arguments.get("detail_patches"), "array"), + metadata=_coerce_json_arg(arguments.get("metadata"), "object") ) content = [] meta_parts = [f"success: {result['success']}"] @@ -1079,7 +1209,11 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server if result.get("revision"): meta_parts.append(f"revision: {result['revision']}") if result.get("validation") and not result["validation"].get("success"): - meta_parts.append(f"validation errors: {result['validation'].get('errors', [])}") + val = result["validation"] + error_detail = val.get('error') or '' + if val.get('output'): + error_detail = f"{error_detail}\n{val['output']}" if error_detail else val['output'] + meta_parts.append(f"validation error: {error_detail.strip()}") content.append(TextContent(type="text", text="\n".join(meta_parts))) if result.get("execution"): exec_content = result["execution"].get("content", []) @@ -1094,6 +1228,18 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server sync = _workspace_sync_content(workspace_store, arguments.get("category", "")) if sync: content.append(sync) + if arguments.get("category") == "research" and result.get("execution"): + exec_text = "\n".join( + item.text for item in result["execution"].get("content", []) + if getattr(item, "type", "") == "text" + ) + if exec_text.strip(): + _update_research_summary( + DATA_DIR, + arguments.get("name", ""), + arguments.get("description", "") or "", + exec_text, + ) return content elif name == "PythonRead": return category_manager.read( @@ -1104,7 +1250,7 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server result = category_manager.read_output( category=arguments.get("category", ""), name=arguments.get("name", ""), - files=arguments.get("files"), + files=_coerce_json_arg(arguments.get("files"), "array"), ) if "error" in result: return [TextContent(type="text", text=f"Error: {result['error']}")] @@ -1140,7 +1286,11 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server if result.get("error"): meta_parts.append(f"error: {result['error']}") if result.get("validation") and not result["validation"].get("success"): - meta_parts.append(f"validation errors: {result['validation'].get('errors', [])}") + val = result["validation"] + error_detail = val.get('error') or '' + if val.get('output'): + error_detail = f"{error_detail}\n{val['output']}" if error_detail else val['output'] + meta_parts.append(f"validation error: {error_detail.strip()}") if result.get("success"): _upsert_type(workspace_store, category_manager, arguments.get("category", ""), arguments.get("name", "")) sync = _workspace_sync_content(workspace_store, arguments.get("category", "")) @@ -1175,7 +1325,7 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server environment_yml=_get_env_yml() ) elif name == "CondaInstall": - return await install_packages_async(arguments.get("packages", [])) + return await install_packages_async(_coerce_json_arg(arguments.get("packages"), "array") or []) elif name == "ExecuteResearch": result = await category_manager.execute_research(name=arguments.get("name", "")) if "error" in result: @@ -1184,6 +1334,12 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server content = result.get("content", [TextContent(type="text", text="No output")]) image_count = sum(1 for item in content if item.type == "image") logging.info(f"ExecuteResearch '{arguments.get('name')}': returning {len(content)} items, {image_count} images") + exec_text = "\n".join( + item.text for item in content + if getattr(item, "type", "") == "text" + ) + if exec_text.strip(): + _update_research_summary(DATA_DIR, arguments.get("name", ""), "", exec_text) return content elif name == "EvaluateIndicator": return await evaluate_indicator( @@ -1192,12 +1348,12 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server to_time=arguments.get("to_time"), period_seconds=int(arguments.get("period_seconds", 3600)), pandas_ta_name=arguments.get("pandas_ta_name", ""), - parameters=arguments.get("parameters") or {}, + parameters=_coerce_json_arg(arguments.get("parameters"), "object") or {}, ) elif name == "BacktestStrategy": result = await backtest_strategy( strategy_name=arguments.get("strategy_name", ""), - feeds=arguments.get("feeds", []), + feeds=_coerce_json_arg(arguments.get("feeds"), "array") or [], from_time=arguments.get("from_time"), to_time=arguments.get("to_time"), initial_capital=float(arguments.get("initial_capital", 10_000.0)), @@ -1214,7 +1370,7 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server from_time=arguments.get("from_time"), to_time=arguments.get("to_time"), initial_capital=float(arguments.get("initial_capital", 10_000.0)), - feeds=arguments.get("feeds", []), + feeds=_coerce_json_arg(arguments.get("feeds"), "array") or [], summary=payload.get("summary", {}), statistics=payload.get("statistics", {}), trades=payload.get("trades", []), @@ -1226,7 +1382,7 @@ def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server elif name == "ActivateStrategy": return await activate_strategy( strategy_name=arguments.get("strategy_name", ""), - feeds=arguments.get("feeds", []), + feeds=_coerce_json_arg(arguments.get("feeds"), "array") or [], allocation=float(arguments.get("allocation", 0.0)), paper=bool(arguments.get("paper", True)), ) diff --git a/web/src/components/ChatPanel.vue b/web/src/components/ChatPanel.vue index 6ee6cb0a..cb0004d5 100644 --- a/web/src/components/ChatPanel.vue +++ b/web/src/components/ChatPanel.vue @@ -634,6 +634,23 @@ const injectShadowStyles = () => { } } +const updateShadowStopOverride = (enabled: boolean) => { + const chatEl = document.querySelector('vue-advanced-chat') + if (!chatEl?.shadowRoot) return + chatEl.shadowRoot.querySelector('#vac-stop-override')?.remove() + if (enabled) { + const style = document.createElement('style') + style.id = 'vac-stop-override' + style.textContent = ` + .vac-send-disabled { + pointer-events: auto !important; + cursor: pointer !important; + } + ` + chatEl.shadowRoot.appendChild(style) + } +} + watch(() => channelStore.isReady, async (ready) => { if (!ready) return await nextTick() @@ -644,6 +661,10 @@ watch(() => channelStore.isReady, async (ready) => { }, 100) }) +watch(isAgentProcessing, (val) => { + updateShadowStopOverride(val) +}) + onUnmounted(() => { wsManager.removeHandler(handleMessage) }) @@ -711,6 +732,7 @@ onUnmounted(() => { @mousedown="isStopPressed = true" @mouseup="isStopPressed = false" :style="{ + pointerEvents: 'auto', display: 'flex', alignItems: 'center', justifyContent: 'center',