- Add model-tags parser for @Tag syntax in chat messages - Support Anthropic models (Sonnet, Haiku, Opus) via @tag - Remove Qdrant vector database from infrastructure and configs - Simplify license model config to use null fallbacks - Add greeting stream after model switch via @tag - Fix protobuf field names to camelCase for v7 compatibility - Add 429 rate limit retry logic with exponential backoff - Remove RAG references from agent harness documentation
1711 lines
77 KiB
Python
1711 lines
77 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Dexorder User Container Main Entry Point
|
|
|
|
Brings together:
|
|
- Config and secrets loading from k8s mounted YAML files
|
|
- ZeroMQ event publisher for user events
|
|
- MCP server with minimal "hello world" resource
|
|
- Lifecycle management integration
|
|
"""
|
|
|
|
import asyncio
|
|
import contextlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import signal
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import uvicorn
|
|
import yaml
|
|
from mcp.server import Server
|
|
from mcp.server.stdio import stdio_server
|
|
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
|
|
from mcp.types import Tool, TextContent, ImageContent
|
|
from starlette.applications import Starlette
|
|
from starlette.requests import Request
|
|
from starlette.responses import Response
|
|
from starlette.routing import Route, Mount
|
|
|
|
from dexorder import EventPublisher, start_lifecycle_manager, get_lifecycle_manager
|
|
from dexorder.api import set_api, API
|
|
from dexorder.conda_manager import sync_packages_async, install_packages_async, cleanup_extra_packages_async
|
|
from dexorder.events import EventType, UserEvent, DeliverySpec
|
|
from dexorder.impl.charting_api_impl import ChartingAPIImpl
|
|
from dexorder.impl.data_api_impl import DataAPIImpl
|
|
from dexorder.tools.python_tools import get_category_manager, sanitize_name
|
|
from dexorder.tools.workspace_tools import get_workspace_store
|
|
from dexorder.tools.evaluate_indicator import evaluate_indicator
|
|
from dexorder.tools.backtest_strategy import backtest_strategy
|
|
from dexorder.tools.activate_strategy import activate_strategy, deactivate_strategy, list_active_strategies
|
|
from dexorder.strategy.event_bridge import StrategyEventBridge
|
|
from dexorder.strategy.lifecycle import get_strategy_lifecycle
|
|
|
|
# =============================================================================
|
|
# Global Data Directory
|
|
# =============================================================================
|
|
|
|
# Default data directory (relative to working directory for local dev)
|
|
DEFAULT_DATA_DIR = Path("data")
|
|
|
|
# Global data directory - set after config is loaded
|
|
DATA_DIR: Path = DEFAULT_DATA_DIR
|
|
|
|
|
|
def get_data_dir() -> Path:
|
|
"""Get the global data directory."""
|
|
return DATA_DIR
|
|
|
|
|
|
# =============================================================================
|
|
# Category Types Helpers
|
|
# =============================================================================
|
|
|
|
def _type_store_name(category: str) -> str:
|
|
return f"{category}_types"
|
|
|
|
|
|
def _type_store_key(category: str, name: str) -> str:
|
|
sanitized = sanitize_name(name).lower()
|
|
return f"custom_{sanitized}" if category == "indicator" else sanitized
|
|
|
|
|
|
def _build_type_entry(category: str, meta: dict) -> dict:
|
|
"""Build a {category}_types workspace entry from a metadata dict."""
|
|
name = meta.get('name', '')
|
|
key = _type_store_key(category, name)
|
|
now = int(time.time())
|
|
entry = {
|
|
'key': key,
|
|
'display_name': name,
|
|
'description': meta.get('description', ''),
|
|
'metadata': {},
|
|
'created_at': now,
|
|
'modified_at': now,
|
|
}
|
|
if category == "indicator":
|
|
entry['pandas_ta_name'] = key
|
|
entry['metadata'] = {
|
|
'display_name': name,
|
|
'parameters': meta.get('parameters') or {},
|
|
'input_series': meta.get('input_series') or ['close'],
|
|
'output_columns': meta.get('output_columns') or [{'name': 'value'}],
|
|
'pane': meta.get('pane', 'separate'),
|
|
'filled_areas': meta.get('filled_areas') or [],
|
|
'bands': meta.get('bands') or [],
|
|
}
|
|
elif category == "strategy":
|
|
entry['metadata'] = {
|
|
'data_feeds': meta.get('data_feeds') or [],
|
|
'parameters': meta.get('parameters') or {},
|
|
}
|
|
# research: metadata stays empty (no fields beyond base)
|
|
return entry
|
|
|
|
|
|
def _upsert_type(workspace_store, category_manager, category: str, name: str) -> None:
|
|
"""Read category metadata from disk and upsert into the {category}_types workspace store."""
|
|
read_result = category_manager.read(category, name)
|
|
if not read_result.get('exists') or not read_result.get('metadata'):
|
|
return
|
|
entry = _build_type_entry(category, read_result['metadata'])
|
|
key = entry['key']
|
|
store = _type_store_name(category)
|
|
|
|
# Preserve original created_at if already present
|
|
existing = workspace_store.read(store)
|
|
existing_types = (existing.get('data') or {}).get('types') or {}
|
|
if key in existing_types:
|
|
entry['created_at'] = existing_types[key].get('created_at', entry['created_at'])
|
|
|
|
workspace_store.patch(store, [{'op': 'add', 'path': f'/types/{key}', 'value': entry}])
|
|
logging.info(f"Upserted {store}/{key} for '{name}'")
|
|
|
|
|
|
def _remove_type(workspace_store, category: str, name: str) -> None:
|
|
"""Remove a category item from the {category}_types workspace store."""
|
|
key = _type_store_key(category, name)
|
|
store = _type_store_name(category)
|
|
try:
|
|
workspace_store.patch(store, [{'op': 'remove', 'path': f'/types/{key}'}])
|
|
logging.info(f"Removed {store}/{key} for '{name}'")
|
|
except Exception:
|
|
pass # entry may not exist; that's fine
|
|
if category == "indicator":
|
|
_remove_indicator_instances(workspace_store, key)
|
|
|
|
|
|
def _remove_indicator_instances(workspace_store, pandas_ta_name: str) -> None:
|
|
"""Remove all instances of a custom indicator from the indicators workspace store."""
|
|
existing = workspace_store.read('indicators')
|
|
instances = (existing.get('data') or {}).get('indicators') or {}
|
|
to_remove = [inst_id for inst_id, inst in instances.items()
|
|
if inst.get('pandas_ta_name') == pandas_ta_name]
|
|
if not to_remove:
|
|
return
|
|
patches = [{'op': 'remove', 'path': f'/indicators/{inst_id}'} for inst_id in to_remove]
|
|
try:
|
|
workspace_store.patch('indicators', patches)
|
|
logging.info(f"Removed {len(to_remove)} instance(s) of {pandas_ta_name} from indicators store")
|
|
except Exception:
|
|
logging.warning(f"Failed to remove indicator instances for {pandas_ta_name}", exc_info=True)
|
|
|
|
|
|
def _workspace_sync_content(workspace_store, category: str) -> "TextContent | None":
|
|
"""
|
|
Return a TextContent item carrying the current {category}_types workspace state so the
|
|
gateway can sync it to connected web clients without a separate WorkspacePatch call.
|
|
The gateway detects items of the form {"_workspace_sync": {"store": ..., "data": ...}}.
|
|
"""
|
|
store = _type_store_name(category)
|
|
result = workspace_store.read(store)
|
|
if not result.get('exists'):
|
|
return None
|
|
import json as _json
|
|
payload = _json.dumps({"_workspace_sync": {"store": store, "data": result.get("data")}})
|
|
return TextContent(type="text", text=payload)
|
|
|
|
|
|
def _populate_types_from_disk(workspace_store, category_manager, category: str) -> None:
|
|
"""Scan existing category items and add any missing entries to the {category}_types store."""
|
|
store = _type_store_name(category)
|
|
existing = workspace_store.read(store)
|
|
existing_types = (existing.get('data') or {}).get('types') or {}
|
|
|
|
items = category_manager.list_items(category).get('items', [])
|
|
added = 0
|
|
for item in items:
|
|
item_name = item.get('name', '')
|
|
if not item_name:
|
|
continue
|
|
key = _type_store_key(category, item_name)
|
|
if key not in existing_types:
|
|
_upsert_type(workspace_store, category_manager, category, item_name)
|
|
added += 1
|
|
|
|
if added > 0:
|
|
logging.info(f"Populated {added} {category} type(s) from disk into {store}")
|
|
|
|
|
|
def _get_env_yml() -> Optional[Path]:
|
|
"""Return the path to environment.yml if it exists alongside main.py."""
|
|
p = Path(__file__).parent / "environment.yml"
|
|
return p if p.exists() else None
|
|
|
|
|
|
def _coerce_json_arg(val, expected_type: str):
|
|
"""Coerce a possibly-stringified JSON argument to its expected Python type.
|
|
Handles LLMs that serialize structured arguments as JSON strings.
|
|
expected_type: 'object' → dict | 'array' → list
|
|
Returns None if val is None or coercion is not possible.
|
|
"""
|
|
if val is None:
|
|
return None
|
|
target = dict if expected_type == "object" else list
|
|
if isinstance(val, target):
|
|
return val
|
|
if isinstance(val, str):
|
|
try:
|
|
parsed = json.loads(val)
|
|
return parsed if isinstance(parsed, target) else None
|
|
except (ValueError, TypeError):
|
|
return None
|
|
return None
|
|
|
|
|
|
def _update_research_summary(data_dir: Path, script_name: str, description: str, text_output: str) -> None:
|
|
"""
|
|
Upsert the research-summary.md entry for the given script name.
|
|
Uses HTML comment anchors (<!-- BEGIN:name --> / <!-- END:name -->) to locate entries.
|
|
New entries get a stub with a findings placeholder; existing entries only have their
|
|
Last Run (and optionally Description) updated — agent-written findings are preserved.
|
|
"""
|
|
import re
|
|
from datetime import datetime, timezone
|
|
summary_path = data_dir / "research-summary.md"
|
|
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
begin_marker = f"<!-- BEGIN:{script_name} -->"
|
|
end_marker = f"<!-- END:{script_name} -->"
|
|
|
|
stub_parts = [begin_marker, f"## {script_name}"]
|
|
if description:
|
|
stub_parts.append(f"**Description:** {description}")
|
|
stub_parts.append(f"**Last Run:** {timestamp}")
|
|
stub_parts.append("")
|
|
stub_parts.append("**Findings:** *(awaiting agent summary)*")
|
|
stub_parts.append(end_marker)
|
|
stub_entry = "\n".join(stub_parts)
|
|
|
|
if not summary_path.exists():
|
|
summary_path.write_text(f"# Research Summary\n\n{stub_entry}\n", encoding="utf-8")
|
|
return
|
|
|
|
content = summary_path.read_text(encoding="utf-8")
|
|
begin_idx = content.find(begin_marker)
|
|
end_idx = content.find(end_marker)
|
|
|
|
if begin_idx != -1 and end_idx != -1:
|
|
# Entry exists — update only Last Run (and Description if provided), preserve findings
|
|
existing = content[begin_idx : end_idx + len(end_marker)]
|
|
updated = re.sub(r'\*\*Last Run:\*\* [^\n]*', f'**Last Run:** {timestamp}', existing)
|
|
if description:
|
|
if '**Description:**' in updated:
|
|
updated = re.sub(r'\*\*Description:\*\* [^\n]*', f'**Description:** {description}', updated)
|
|
else:
|
|
updated = re.sub(r'(## [^\n]*\n)', f'\\1**Description:** {description}\n', updated, count=1)
|
|
new_content = content[:begin_idx] + updated + content[end_idx + len(end_marker):]
|
|
summary_path.write_text(new_content, encoding="utf-8")
|
|
else:
|
|
summary_path.write_text(content.rstrip() + "\n\n---\n\n" + stub_entry + "\n", encoding="utf-8")
|
|
|
|
|
|
|
|
# =============================================================================
|
|
# Configuration
|
|
# =============================================================================
|
|
|
|
class Config:
|
|
"""Application configuration loaded from config.yaml and secrets.yaml"""
|
|
|
|
def __init__(self):
|
|
# User ID (required)
|
|
self.user_id: str = os.getenv("USER_ID", "")
|
|
if not self.user_id:
|
|
raise ValueError("USER_ID environment variable required")
|
|
|
|
# Config and secrets paths (k8s mounted)
|
|
self.config_path = Path(os.getenv("CONFIG_PATH", "/app/config/config.yaml"))
|
|
self.secrets_path = Path(os.getenv("SECRETS_PATH", "/app/config/secrets.yaml"))
|
|
|
|
# ZMQ ports for event system
|
|
self.zmq_xpub_port: int = int(os.getenv("ZMQ_XPUB_PORT", "5570"))
|
|
self.zmq_gateway_endpoint: str = os.getenv(
|
|
"ZMQ_GATEWAY_ENDPOINT",
|
|
"tcp://gateway:5571"
|
|
)
|
|
|
|
# MCP server settings
|
|
self.mcp_server_name: str = os.getenv("MCP_SERVER_NAME", "dexorder-user")
|
|
self.mcp_transport: str = os.getenv("MCP_TRANSPORT", "sse") # "stdio" or "sse"
|
|
self.mcp_http_port: int = int(os.getenv("MCP_HTTP_PORT", "3000"))
|
|
self.mcp_http_host: str = os.getenv("MCP_HTTP_HOST", "0.0.0.0")
|
|
|
|
# Lifecycle settings
|
|
self.idle_timeout_minutes: int = int(os.getenv("IDLE_TIMEOUT_MINUTES", "15"))
|
|
self.enable_idle_shutdown: bool = os.getenv("ENABLE_IDLE_SHUTDOWN", "true").lower() == "true"
|
|
|
|
# Loaded from files
|
|
self.config_data: dict = {}
|
|
self.secrets_data: dict = {}
|
|
|
|
# Data directory (set after config load)
|
|
self.data_dir: Path = DEFAULT_DATA_DIR
|
|
|
|
def load(self) -> None:
|
|
"""Load configuration and secrets from YAML files"""
|
|
global DATA_DIR
|
|
|
|
# Load config.yaml if exists
|
|
if self.config_path.exists():
|
|
with open(self.config_path) as f:
|
|
self.config_data = yaml.safe_load(f) or {}
|
|
logging.info(f"Loaded config from {self.config_path}")
|
|
else:
|
|
logging.warning(f"Config file not found: {self.config_path}")
|
|
|
|
# Load secrets.yaml if exists
|
|
if self.secrets_path.exists():
|
|
with open(self.secrets_path) as f:
|
|
self.secrets_data = yaml.safe_load(f) or {}
|
|
logging.info(f"Loaded secrets from {self.secrets_path}")
|
|
else:
|
|
logging.warning(f"Secrets file not found: {self.secrets_path}")
|
|
|
|
# Set data directory from config or environment
|
|
# Priority: env var > config file > default
|
|
data_dir_str = os.getenv("DATA_DIR") or self.config_data.get("data_dir")
|
|
if data_dir_str:
|
|
self.data_dir = Path(data_dir_str)
|
|
else:
|
|
self.data_dir = DEFAULT_DATA_DIR
|
|
|
|
# Update global DATA_DIR
|
|
DATA_DIR = self.data_dir
|
|
|
|
# Ensure data directory exists
|
|
self.data_dir.mkdir(parents=True, exist_ok=True)
|
|
logging.info(f"Data directory: {self.data_dir}")
|
|
|
|
@property
|
|
def workspace_dir(self) -> Path:
|
|
"""Workspace directory under DATA_DIR."""
|
|
return self.data_dir / "workspace"
|
|
|
|
|
|
# =============================================================================
|
|
# MCP Server Setup
|
|
# =============================================================================
|
|
|
|
def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server:
|
|
"""Create MCP server with resources and workspace tools"""
|
|
|
|
server = Server(config.mcp_server_name)
|
|
|
|
# Initialize workspace store
|
|
workspace_store = get_workspace_store(config.workspace_dir)
|
|
logging.info(f"Workspace store initialized at {config.workspace_dir}")
|
|
|
|
# Initialize category file manager
|
|
category_manager = get_category_manager(config.data_dir)
|
|
logging.info(f"Category manager initialized at {config.data_dir}")
|
|
|
|
# Populate {category}_types stores from existing items on disk (migration/startup sync)
|
|
for _cat in ("indicator", "strategy", "research"):
|
|
_populate_types_from_disk(workspace_store, category_manager, _cat)
|
|
|
|
@server.list_resources()
|
|
async def list_resources():
|
|
"""List available resources"""
|
|
resources = [
|
|
{
|
|
"uri": f"dexorder://user/{config.user_id}/hello",
|
|
"name": "Hello World",
|
|
"description": "A simple hello world resource",
|
|
"mimeType": "text/plain",
|
|
}
|
|
]
|
|
if _get_env_yml() is not None:
|
|
resources.append({
|
|
"uri": f"dexorder://user/{config.user_id}/environment.yml",
|
|
"name": "Conda Environment",
|
|
"description": "Base conda environment packages available in all scripts",
|
|
"mimeType": "text/yaml",
|
|
})
|
|
return resources
|
|
|
|
@server.read_resource()
|
|
async def read_resource(uri: str):
|
|
"""Read a resource by URI"""
|
|
if uri == f"dexorder://user/{config.user_id}/hello":
|
|
# Publish an event when resource is accessed
|
|
await event_publisher.publish(UserEvent(
|
|
event_type=EventType.STRATEGY_LOG,
|
|
payload={
|
|
"message": "Hello world resource accessed",
|
|
"uri": uri,
|
|
},
|
|
delivery=DeliverySpec.informational(),
|
|
))
|
|
|
|
return {
|
|
"uri": uri,
|
|
"mimeType": "text/plain",
|
|
"text": f"Hello from Dexorder user container!\nUser ID: {config.user_id}\n",
|
|
}
|
|
elif uri == f"dexorder://user/{config.user_id}/environment.yml":
|
|
env_yml = _get_env_yml()
|
|
if env_yml is None:
|
|
raise ValueError("environment.yml not found")
|
|
return {
|
|
"uri": uri,
|
|
"mimeType": "text/yaml",
|
|
"text": env_yml.read_text(encoding="utf-8"),
|
|
}
|
|
else:
|
|
raise ValueError(f"Unknown resource: {uri}")
|
|
|
|
@server.list_tools()
|
|
async def list_tools():
|
|
"""List available tools including workspace and category tools"""
|
|
return [
|
|
Tool(
|
|
name="WorkspaceRead",
|
|
description="Read a workspace store from persistent storage",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"store_name": {
|
|
"type": "string",
|
|
"description": "Name of the store (e.g., 'chartStore', 'userPreferences')"
|
|
}
|
|
},
|
|
"required": ["store_name"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="WorkspaceWrite",
|
|
description="Write a workspace store to persistent storage",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"store_name": {
|
|
"type": "string",
|
|
"description": "Name of the store"
|
|
},
|
|
"data": {
|
|
"description": "Data to write"
|
|
}
|
|
},
|
|
"required": ["store_name", "data"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="WorkspacePatch",
|
|
description="Apply JSON patch operations to a workspace store",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"store_name": {
|
|
"type": "string",
|
|
"description": "Name of the store"
|
|
},
|
|
"patch": {
|
|
"description": "JSON Patch operations (RFC 6902)",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"op": {"type": "string", "enum": ["add", "remove", "replace", "move", "copy", "test"]},
|
|
"path": {"type": "string"},
|
|
"value": {}
|
|
},
|
|
"required": ["op", "path"]
|
|
}
|
|
}
|
|
},
|
|
"required": ["store_name", "patch"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="PreferencesRead",
|
|
description="Read the user preferences markdown file. Returns the full content of preferences.md from the user's sandbox data directory.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {}
|
|
}
|
|
),
|
|
Tool(
|
|
name="PreferencesWrite",
|
|
description="Write (fully replace) the user preferences markdown file. Use this to create or overwrite preferences.md with new content.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"content": {
|
|
"type": "string",
|
|
"description": "Full markdown content for the preferences file"
|
|
}
|
|
},
|
|
"required": ["content"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="PreferencesPatch",
|
|
description="Surgically update a section of the user preferences markdown file by finding and replacing text. Fails if old_str is not found.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"old_str": {
|
|
"type": "string",
|
|
"description": "Exact text to find in the preferences file"
|
|
},
|
|
"new_str": {
|
|
"type": "string",
|
|
"description": "Replacement text"
|
|
}
|
|
},
|
|
"required": ["old_str", "new_str"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="ResearchSummaryRead",
|
|
description="Read the research summary markdown file. Returns the full content of research-summary.md from the user's sandbox data directory.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {}
|
|
}
|
|
),
|
|
Tool(
|
|
name="ResearchSummaryWrite",
|
|
description="Write (fully replace) the research summary markdown file.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"content": {
|
|
"type": "string",
|
|
"description": "Full markdown content for the research summary file"
|
|
}
|
|
},
|
|
"required": ["content"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="ResearchSummaryPatch",
|
|
description="Surgically update a section of the research summary markdown file by finding and replacing text. Fails if old_str is not found.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"old_str": {
|
|
"type": "string",
|
|
"description": "Exact text to find in the research summary file"
|
|
},
|
|
"new_str": {
|
|
"type": "string",
|
|
"description": "Replacement text"
|
|
}
|
|
},
|
|
"required": ["old_str", "new_str"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="PythonWrite",
|
|
description="Write a new strategy, indicator, or research script with validation",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"category": {
|
|
"type": "string",
|
|
"enum": ["strategy", "indicator", "research"],
|
|
"description": "Category of the script"
|
|
},
|
|
"name": {
|
|
"type": "string",
|
|
"description": "Display name (can contain special characters)"
|
|
},
|
|
"description": {
|
|
"type": "string",
|
|
"description": "LLM-generated description of what this does (required)"
|
|
},
|
|
"details": {
|
|
"type": "string",
|
|
"description": (
|
|
"Full markdown description of the code with sufficient detail that another coding agent "
|
|
"could functionally reproduce the implementation from this field alone. "
|
|
"Include: purpose, algorithm, all parameters and their semantics, data feed usage, "
|
|
"formulas, edge cases, and any non-obvious implementation choices (required). "
|
|
"Stored as a separate details.md file alongside the implementation."
|
|
)
|
|
},
|
|
"code": {
|
|
"type": "string",
|
|
"description": "Python implementation code"
|
|
},
|
|
"metadata": {
|
|
"description": (
|
|
"Optional category-specific metadata. "
|
|
"For strategy: include 'data_feeds' (list of {symbol, period_seconds, description}) "
|
|
"and 'parameters' (object mapping param_name → {default, description}). "
|
|
"Example: {\"data_feeds\": [{\"symbol\": \"BTC/USDT.BINANCE\", \"period_seconds\": 3600, \"description\": \"Primary BTC/USDT hourly feed\"}], "
|
|
"\"parameters\": {\"rsi_length\": {\"default\": 14, \"description\": \"RSI lookback period\"}, \"threshold\": {\"default\": 70, \"description\": \"Overbought level\"}}}. "
|
|
"For indicator: include 'default_length' (int). "
|
|
"For any category: 'conda_packages' (list of package names) if extra dependencies are needed."
|
|
)
|
|
}
|
|
},
|
|
"required": ["category", "name", "description", "details", "code"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="PythonEdit",
|
|
description=(
|
|
"Edit an existing category script. "
|
|
"Use 'patches' for targeted string replacements (preferred for small changes), "
|
|
"or 'code' to replace the full implementation. Do not supply both."
|
|
),
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"category": {
|
|
"type": "string",
|
|
"enum": ["strategy", "indicator", "research"],
|
|
"description": "Category of the script"
|
|
},
|
|
"name": {
|
|
"type": "string",
|
|
"description": "Display name of the existing item"
|
|
},
|
|
"code": {
|
|
"type": "string",
|
|
"description": "Full replacement Python code. Use only when rewriting the entire implementation; prefer 'patches' for targeted edits."
|
|
},
|
|
"patches": {
|
|
"description": (
|
|
"Targeted code edits as old/new string pairs. Preferred over 'code' for small changes. "
|
|
"Each patch: {\"old_string\": \"exact text to find\", \"new_string\": \"replacement text\"}. "
|
|
"old_string must be unique in the file (add surrounding context if needed). "
|
|
"Patches are applied in order."
|
|
),
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"old_string": {"type": "string"},
|
|
"new_string": {"type": "string"}
|
|
},
|
|
"required": ["old_string", "new_string"]
|
|
}
|
|
},
|
|
"description": {
|
|
"type": "string",
|
|
"description": "Updated description (optional, omit to keep existing)"
|
|
},
|
|
"details": {
|
|
"type": "string",
|
|
"description": "Full replacement for the details field. Use only when rewriting the entire description; prefer 'detail_patches' for targeted edits."
|
|
},
|
|
"detail_patches": {
|
|
"description": (
|
|
"Targeted edits to the details field as old/new string pairs. Preferred over 'details' for small changes. "
|
|
"Each patch: {\"old_string\": \"exact text to find\", \"new_string\": \"replacement text\"}. "
|
|
"old_string must be unique in the details field (add surrounding context if needed). "
|
|
"Patches are applied in order. Mutually exclusive with 'details'."
|
|
),
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"old_string": {"type": "string"},
|
|
"new_string": {"type": "string"}
|
|
},
|
|
"required": ["old_string", "new_string"]
|
|
}
|
|
},
|
|
"metadata": {
|
|
"description": (
|
|
"Updated metadata fields (optional). "
|
|
"For strategy: 'data_feeds' (list of {symbol, period_seconds, description}) "
|
|
"and/or 'parameters' (object mapping param_name → {default, description}). "
|
|
"For indicator: 'default_length' (int). "
|
|
"For any category: 'conda_packages' (list of package names)."
|
|
)
|
|
}
|
|
},
|
|
"required": ["category", "name"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="PythonRead",
|
|
description="Read a category script, its metadata, and details. Returns: code, details (markdown), and metadata.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"category": {
|
|
"type": "string",
|
|
"enum": ["strategy", "indicator", "research"],
|
|
"description": "Category of the script"
|
|
},
|
|
"name": {
|
|
"type": "string",
|
|
"description": "Display name of the item"
|
|
}
|
|
},
|
|
"required": ["category", "name"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="PythonReadOutput",
|
|
description=(
|
|
"Read persisted output files from a previous research script execution. "
|
|
"Returns TextContent for .md/.txt files and ImageContent for images. "
|
|
"Output is saved automatically when ExecuteResearch or PythonWrite/PythonEdit runs a research script."
|
|
),
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"category": {
|
|
"type": "string",
|
|
"enum": ["strategy", "indicator", "research"],
|
|
"description": "Category of the script"
|
|
},
|
|
"name": {
|
|
"type": "string",
|
|
"description": "Display name of the item"
|
|
},
|
|
"files": {
|
|
"description": (
|
|
"Specific filenames under output/ to return (e.g. [\"analysis.md\", \"img1.png\"]). "
|
|
"If omitted, returns all output files listed in metadata."
|
|
)
|
|
}
|
|
},
|
|
"required": ["category", "name"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="PythonList",
|
|
description="List all items in a category with names and descriptions",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"category": {
|
|
"type": "string",
|
|
"enum": ["strategy", "indicator", "research"],
|
|
"description": "Category to list"
|
|
}
|
|
},
|
|
"required": ["category"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="PythonLog",
|
|
description="Show git commit history for category items. Filter by category and/or name to see history for a specific item.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"category": {
|
|
"type": "string",
|
|
"enum": ["strategy", "indicator", "research"],
|
|
"description": "Filter to this category (optional)"
|
|
},
|
|
"name": {
|
|
"type": "string",
|
|
"description": "Filter to this item (optional, requires category)"
|
|
},
|
|
"limit": {
|
|
"type": "integer",
|
|
"description": "Max commits to return (default 20)",
|
|
"default": 20
|
|
}
|
|
},
|
|
"required": []
|
|
}
|
|
),
|
|
Tool(
|
|
name="PythonRevert",
|
|
description="Restore a category item to a previous git revision. Creates a new commit — non-destructive.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"revision": {
|
|
"type": "string",
|
|
"description": "Git commit hash (full or short) to restore to"
|
|
},
|
|
"category": {
|
|
"type": "string",
|
|
"enum": ["strategy", "indicator", "research"],
|
|
"description": "Category of the item"
|
|
},
|
|
"name": {
|
|
"type": "string",
|
|
"description": "Display name of the item to restore"
|
|
}
|
|
},
|
|
"required": ["revision", "category", "name"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="PythonDelete",
|
|
description="Delete a category script permanently. Commits removal to git history and removes any conda packages that are no longer needed.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"category": {
|
|
"type": "string",
|
|
"enum": ["strategy", "indicator", "research"],
|
|
"description": "Category of the script"
|
|
},
|
|
"name": {
|
|
"type": "string",
|
|
"description": "Display name of the item to delete"
|
|
}
|
|
},
|
|
"required": ["category", "name"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="CondaSync",
|
|
description="Sync conda packages: scan all metadata, remove unused packages (excluding base environment)",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {},
|
|
"required": []
|
|
}
|
|
),
|
|
Tool(
|
|
name="CondaInstall",
|
|
description="Install conda packages on-demand",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"packages": {
|
|
"description": "List of conda package names to install"
|
|
}
|
|
},
|
|
"required": ["packages"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="ExecuteResearch",
|
|
description="Execute a research script and return results with matplotlib images",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"name": {
|
|
"type": "string",
|
|
"description": "Display name of the research script to execute"
|
|
}
|
|
},
|
|
"required": ["name"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="EvaluateIndicator",
|
|
description=(
|
|
"Evaluate a pandas-ta indicator against real OHLC data and return a structured "
|
|
"array of timestamped values. Use this to validate that an indicator computes "
|
|
"correctly before adding it to the workspace, or to inspect its output values."
|
|
),
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"symbol": {
|
|
"type": "string",
|
|
"description": "Market symbol in 'MARKET.EXCHANGE' format, e.g. 'BTC/USDT.BINANCE'"
|
|
},
|
|
"from_time": {
|
|
"description": "Start of time range. Unix timestamp (int) or date string e.g. '30 days ago', '2024-01-01'"
|
|
},
|
|
"to_time": {
|
|
"description": "End of time range. Unix timestamp (int) or date string e.g. 'now', '2024-03-01'"
|
|
},
|
|
"period_seconds": {
|
|
"type": "integer",
|
|
"description": "Candle period in seconds (e.g. 3600 for 1h, 900 for 15m, 86400 for 1d)",
|
|
"default": 3600
|
|
},
|
|
"pandas_ta_name": {
|
|
"type": "string",
|
|
"description": "Lowercase pandas-ta function name, e.g. 'rsi', 'macd', 'bbands'"
|
|
},
|
|
"parameters": {
|
|
"description": "pandas-ta keyword arguments, e.g. {\"length\": 14} or {\"fast\": 12, \"slow\": 26, \"signal\": 9}",
|
|
"default": {}
|
|
}
|
|
},
|
|
"required": ["symbol", "from_time", "to_time", "pandas_ta_name"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="BacktestStrategy",
|
|
description=(
|
|
"Run a saved trading strategy against historical OHLC data using Nautilus Trader "
|
|
"BacktestEngine. Returns performance metrics (total return, Sharpe ratio, "
|
|
"max drawdown, win rate, trade count) and a full equity curve. "
|
|
"Supports multiple data feeds and includes order-flow fields (buy_vol, sell_vol, "
|
|
"open_interest) in the strategy's DataFrame."
|
|
),
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"strategy_name": {
|
|
"type": "string",
|
|
"description": "Display name of the strategy as saved via PythonWrite"
|
|
},
|
|
"feeds": {
|
|
"description": "Data feeds to backtest against",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"symbol": {
|
|
"type": "string",
|
|
"description": "Market symbol in 'MARKET.EXCHANGE' format, e.g. 'BTC/USDT.BINANCE'"
|
|
},
|
|
"period_seconds": {
|
|
"type": "integer",
|
|
"description": "Candle period in seconds (e.g. 3600 for 1h)",
|
|
"default": 3600
|
|
}
|
|
},
|
|
"required": ["symbol"]
|
|
},
|
|
"minItems": 1
|
|
},
|
|
"from_time": {
|
|
"description": "Backtest start. Unix timestamp or date string e.g. '2024-01-01', '90 days ago'"
|
|
},
|
|
"to_time": {
|
|
"description": "Backtest end. Unix timestamp or date string e.g. '2025-01-01', 'now'"
|
|
},
|
|
"initial_capital": {
|
|
"type": "number",
|
|
"description": "Starting capital in quote currency (e.g. 10000.0 USDT)",
|
|
"default": 10000.0
|
|
},
|
|
"paper": {
|
|
"type": "boolean",
|
|
"description": "Always true for historical backtest (reserved for forward testing)",
|
|
"default": True
|
|
}
|
|
},
|
|
"required": ["strategy_name", "feeds", "from_time", "to_time"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="ActivateStrategy",
|
|
description=(
|
|
"Activate a strategy for paper or live forward trading with a capital allocation. "
|
|
"paper=true (default): simulated fills on live data — no API keys required. "
|
|
"paper=false: real execution via user secrets vault (not yet implemented). "
|
|
"Note: live data streaming is TBD; this registers the strategy for when it becomes available."
|
|
),
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"strategy_name": {
|
|
"type": "string",
|
|
"description": "Display name of the strategy as saved via PythonWrite"
|
|
},
|
|
"feeds": {
|
|
"description": "Data feeds for the strategy",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"symbol": {"type": "string"},
|
|
"period_seconds": {"type": "integer", "default": 3600}
|
|
},
|
|
"required": ["symbol"]
|
|
},
|
|
"minItems": 1
|
|
},
|
|
"allocation": {
|
|
"type": "number",
|
|
"description": "Capital allocated in quote currency (e.g. 5000.0 USDT)"
|
|
},
|
|
"paper": {
|
|
"type": "boolean",
|
|
"description": "True = paper/simulated (default); False = live execution",
|
|
"default": True
|
|
}
|
|
},
|
|
"required": ["strategy_name", "feeds", "allocation"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="DeactivateStrategy",
|
|
description="Stop an active strategy and return its final P&L summary.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"strategy_name": {
|
|
"type": "string",
|
|
"description": "Display name of the active strategy to stop"
|
|
}
|
|
},
|
|
"required": ["strategy_name"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="ListActiveStrategies",
|
|
description="List all currently active (live or paper) strategies and their status.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {},
|
|
"required": []
|
|
}
|
|
),
|
|
Tool(
|
|
name="GetBacktestResults",
|
|
description=(
|
|
"Retrieve stored backtest results for a strategy. "
|
|
"Returns the most recent backtest runs with summary stats, "
|
|
"extended statistics, trade list, and equity curve."
|
|
),
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"strategy_name": {
|
|
"type": "string",
|
|
"description": "Display name of the strategy"
|
|
},
|
|
"limit": {
|
|
"type": "integer",
|
|
"description": "Maximum number of backtest runs to return (default 5)",
|
|
"default": 5
|
|
}
|
|
},
|
|
"required": ["strategy_name"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="GetStrategyTrades",
|
|
description=(
|
|
"Retrieve the trade log for a strategy (live/paper or backtest). "
|
|
"Returns individual round-trip trades with entry/exit prices and PnL."
|
|
),
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"strategy_name": {
|
|
"type": "string",
|
|
"description": "Display name of the strategy"
|
|
},
|
|
"limit": {
|
|
"type": "integer",
|
|
"description": "Maximum number of trades to return (default 100)",
|
|
"default": 100
|
|
}
|
|
},
|
|
"required": ["strategy_name"]
|
|
}
|
|
),
|
|
Tool(
|
|
name="GetStrategyEvents",
|
|
description=(
|
|
"Retrieve the event log for a strategy "
|
|
"(PnL updates, fills, errors, status changes)."
|
|
),
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"strategy_name": {
|
|
"type": "string",
|
|
"description": "Display name of the strategy"
|
|
},
|
|
"event_type": {
|
|
"type": "string",
|
|
"description": "Filter by event type (optional): PNL_UPDATE, ORDER_FILLED, ERROR, etc."
|
|
},
|
|
"limit": {
|
|
"type": "integer",
|
|
"description": "Maximum number of events to return (default 50)",
|
|
"default": 50
|
|
}
|
|
},
|
|
"required": ["strategy_name"]
|
|
}
|
|
),
|
|
]
|
|
|
|
|
|
@server.call_tool()
|
|
async def handle_tool_call(name: str, arguments: dict):
|
|
"""Handle tool calls including workspace and category tools"""
|
|
get_lifecycle_manager().record_activity()
|
|
try:
|
|
return await _handle_tool_call_inner(name, arguments)
|
|
except Exception:
|
|
logging.exception("Unhandled exception in tool '%s'", name)
|
|
raise
|
|
|
|
async def _handle_tool_call_inner(name: str, arguments: dict):
|
|
if name == "WorkspaceRead":
|
|
return workspace_store.read(arguments.get("store_name", ""))
|
|
elif name == "WorkspaceWrite":
|
|
return workspace_store.write(
|
|
arguments.get("store_name", ""),
|
|
arguments.get("data")
|
|
)
|
|
elif name == "WorkspacePatch":
|
|
return workspace_store.patch(
|
|
arguments.get("store_name", ""),
|
|
_coerce_json_arg(arguments.get("patch"), "array") or []
|
|
)
|
|
elif name == "PreferencesRead":
|
|
prefs_path = DATA_DIR / "preferences.md"
|
|
if not prefs_path.exists():
|
|
return {"content": "", "exists": False}
|
|
content = prefs_path.read_text(encoding="utf-8")
|
|
return {"content": content, "exists": True}
|
|
elif name == "PreferencesWrite":
|
|
prefs_path = DATA_DIR / "preferences.md"
|
|
prefs_path.write_text(arguments.get("content", ""), encoding="utf-8")
|
|
return {"success": True}
|
|
elif name == "PreferencesPatch":
|
|
prefs_path = DATA_DIR / "preferences.md"
|
|
old_str = arguments.get("old_str", "")
|
|
new_str = arguments.get("new_str", "")
|
|
content = prefs_path.read_text(encoding="utf-8") if prefs_path.exists() else ""
|
|
if old_str not in content:
|
|
return {"success": False, "error": "old_str not found in preferences file"}
|
|
prefs_path.write_text(content.replace(old_str, new_str, 1), encoding="utf-8")
|
|
return {"success": True}
|
|
elif name == "ResearchSummaryRead":
|
|
summary_path = DATA_DIR / "research-summary.md"
|
|
if not summary_path.exists():
|
|
return {"content": "", "exists": False}
|
|
content = summary_path.read_text(encoding="utf-8")
|
|
return {"content": content, "exists": True}
|
|
elif name == "ResearchSummaryWrite":
|
|
summary_path = DATA_DIR / "research-summary.md"
|
|
summary_path.write_text(arguments.get("content", ""), encoding="utf-8")
|
|
return {"success": True}
|
|
elif name == "ResearchSummaryPatch":
|
|
summary_path = DATA_DIR / "research-summary.md"
|
|
old_str = arguments.get("old_str", "")
|
|
new_str = arguments.get("new_str", "")
|
|
content = summary_path.read_text(encoding="utf-8") if summary_path.exists() else ""
|
|
if old_str not in content:
|
|
return {"success": False, "error": "old_str not found in research summary file"}
|
|
summary_path.write_text(content.replace(old_str, new_str, 1), encoding="utf-8")
|
|
return {"success": True}
|
|
elif name == "PythonWrite":
|
|
result = await category_manager.write(
|
|
category=arguments.get("category", ""),
|
|
name=arguments.get("name", ""),
|
|
description=arguments.get("description", ""),
|
|
details=arguments.get("details", ""),
|
|
code=arguments.get("code", ""),
|
|
metadata=_coerce_json_arg(arguments.get("metadata"), "object")
|
|
)
|
|
content = []
|
|
meta_parts = [f"success: {result['success']}"]
|
|
if result.get('path'):
|
|
meta_parts.append(f"path: {result['path']}")
|
|
if result.get('error'):
|
|
meta_parts.append(f"error: {result['error']}")
|
|
if result.get("revision"):
|
|
meta_parts.append(f"revision: {result['revision']}")
|
|
if result.get("validation"):
|
|
val = result["validation"]
|
|
if not val.get("success"):
|
|
error_detail = val.get('error') or ''
|
|
if val.get('output'):
|
|
error_detail = f"{error_detail}\n{val['output']}" if error_detail else val['output']
|
|
meta_parts.append(f"validation error: {error_detail.strip()}")
|
|
elif val.get("output"):
|
|
# Always show output — may contain ⚠ WARNING for all-NaN / all-zero results
|
|
meta_parts.append(f"validation output: {val['output']}")
|
|
content.append(TextContent(type="text", text="\n".join(meta_parts)))
|
|
if result.get("execution"):
|
|
exec_result = result["execution"]
|
|
exec_content = exec_result.get("content", [])
|
|
if not exec_content and exec_result.get("output"):
|
|
# _execute_indicator returns plain {"output": str}, not MCP {"content": [...]}
|
|
exec_content = [TextContent(type="text", text=exec_result["output"])]
|
|
content.extend(exec_content)
|
|
image_count = sum(1 for item in exec_content if item.type == "image")
|
|
logging.info(f"PythonWrite '{arguments.get('name')}': returning {len(content)} items, {image_count} images")
|
|
else:
|
|
logging.info(f"PythonWrite '{arguments.get('name')}': no execution result (category={arguments.get('category')})")
|
|
if result.get("success"):
|
|
_upsert_type(workspace_store, category_manager, arguments.get("category", ""), arguments.get("name", ""))
|
|
await cleanup_extra_packages_async(get_data_dir(), _get_env_yml())
|
|
sync = _workspace_sync_content(workspace_store, arguments.get("category", ""))
|
|
if sync:
|
|
content.append(sync)
|
|
if arguments.get("category") == "research" and result.get("execution"):
|
|
exec_text = "\n".join(
|
|
item.text for item in result["execution"].get("content", [])
|
|
if getattr(item, "type", "") == "text"
|
|
)
|
|
if exec_text.strip():
|
|
_update_research_summary(
|
|
DATA_DIR,
|
|
arguments.get("name", ""),
|
|
arguments.get("description", "") or "",
|
|
exec_text,
|
|
)
|
|
return content
|
|
elif name == "PythonEdit":
|
|
result = await category_manager.edit(
|
|
category=arguments.get("category", ""),
|
|
name=arguments.get("name", ""),
|
|
code=arguments.get("code"),
|
|
patches=_coerce_json_arg(arguments.get("patches"), "array"),
|
|
description=arguments.get("description"),
|
|
details=arguments.get("details"),
|
|
detail_patches=_coerce_json_arg(arguments.get("detail_patches"), "array"),
|
|
metadata=_coerce_json_arg(arguments.get("metadata"), "object")
|
|
)
|
|
content = []
|
|
meta_parts = [f"success: {result['success']}"]
|
|
if result.get('path'):
|
|
meta_parts.append(f"path: {result['path']}")
|
|
if result.get('error'):
|
|
meta_parts.append(f"error: {result['error']}")
|
|
if result.get("revision"):
|
|
meta_parts.append(f"revision: {result['revision']}")
|
|
if result.get("validation"):
|
|
val = result["validation"]
|
|
if not val.get("success"):
|
|
error_detail = val.get('error') or ''
|
|
if val.get('output'):
|
|
error_detail = f"{error_detail}\n{val['output']}" if error_detail else val['output']
|
|
meta_parts.append(f"validation error: {error_detail.strip()}")
|
|
elif val.get("output"):
|
|
# Always show output — may contain ⚠ WARNING for all-NaN / all-zero results
|
|
meta_parts.append(f"validation output: {val['output']}")
|
|
content.append(TextContent(type="text", text="\n".join(meta_parts)))
|
|
if result.get("execution"):
|
|
exec_result = result["execution"]
|
|
exec_content = exec_result.get("content", [])
|
|
if not exec_content and exec_result.get("output"):
|
|
# _execute_indicator returns plain {"output": str}, not MCP {"content": [...]}
|
|
exec_content = [TextContent(type="text", text=exec_result["output"])]
|
|
content.extend(exec_content)
|
|
image_count = sum(1 for item in exec_content if item.type == "image")
|
|
logging.info(f"PythonEdit '{arguments.get('name')}': returning {len(content)} items, {image_count} images")
|
|
else:
|
|
logging.info(f"PythonEdit '{arguments.get('name')}': no execution result")
|
|
if result.get("success"):
|
|
_upsert_type(workspace_store, category_manager, arguments.get("category", ""), arguments.get("name", ""))
|
|
await cleanup_extra_packages_async(get_data_dir(), _get_env_yml())
|
|
sync = _workspace_sync_content(workspace_store, arguments.get("category", ""))
|
|
if sync:
|
|
content.append(sync)
|
|
if arguments.get("category") == "research" and result.get("execution"):
|
|
exec_text = "\n".join(
|
|
item.text for item in result["execution"].get("content", [])
|
|
if getattr(item, "type", "") == "text"
|
|
)
|
|
if exec_text.strip():
|
|
_update_research_summary(
|
|
DATA_DIR,
|
|
arguments.get("name", ""),
|
|
arguments.get("description", "") or "",
|
|
exec_text,
|
|
)
|
|
return content
|
|
elif name == "PythonRead":
|
|
return category_manager.read(
|
|
category=arguments.get("category", ""),
|
|
name=arguments.get("name", "")
|
|
)
|
|
elif name == "PythonReadOutput":
|
|
result = category_manager.read_output(
|
|
category=arguments.get("category", ""),
|
|
name=arguments.get("name", ""),
|
|
files=_coerce_json_arg(arguments.get("files"), "array"),
|
|
)
|
|
if "error" in result:
|
|
return [TextContent(type="text", text=f"Error: {result['error']}")]
|
|
content = result.get("content", [])
|
|
summary = TextContent(
|
|
type="text",
|
|
text=f"output_dir: {result.get('output_dir', '')}\nfiles_returned: {result.get('files_returned', [])}"
|
|
)
|
|
return [summary] + content
|
|
elif name == "PythonList":
|
|
return category_manager.list_items(
|
|
category=arguments.get("category", "")
|
|
)
|
|
elif name == "PythonLog":
|
|
result = await category_manager.git_log(
|
|
category=arguments.get("category"),
|
|
name=arguments.get("name"),
|
|
limit=int(arguments.get("limit", 20))
|
|
)
|
|
lines = [f"success: {result['success']}"]
|
|
for c in result.get("commits", []):
|
|
lines.append(f"{c['short_hash']} {c['date'][:10]} {c['message']}")
|
|
return [TextContent(type="text", text="\n".join(lines))]
|
|
elif name == "PythonRevert":
|
|
result = await category_manager.git_revert(
|
|
revision=arguments.get("revision", ""),
|
|
category=arguments.get("category", ""),
|
|
name=arguments.get("name", "")
|
|
)
|
|
meta_parts = [f"success: {result['success']}"]
|
|
if result.get("revision"):
|
|
meta_parts.append(f"revision: {result['revision']}")
|
|
if result.get("error"):
|
|
meta_parts.append(f"error: {result['error']}")
|
|
if result.get("validation") and not result["validation"].get("success"):
|
|
val = result["validation"]
|
|
error_detail = val.get('error') or ''
|
|
if val.get('output'):
|
|
error_detail = f"{error_detail}\n{val['output']}" if error_detail else val['output']
|
|
meta_parts.append(f"validation error: {error_detail.strip()}")
|
|
if result.get("success"):
|
|
_upsert_type(workspace_store, category_manager, arguments.get("category", ""), arguments.get("name", ""))
|
|
sync = _workspace_sync_content(workspace_store, arguments.get("category", ""))
|
|
content_out = [TextContent(type="text", text="\n".join(meta_parts))]
|
|
if sync:
|
|
content_out.append(sync)
|
|
return content_out
|
|
return [TextContent(type="text", text="\n".join(meta_parts))]
|
|
elif name == "PythonDelete":
|
|
result = await category_manager.delete(
|
|
category=arguments.get("category", ""),
|
|
name=arguments.get("name", "")
|
|
)
|
|
if result.get("success"):
|
|
_remove_type(workspace_store, arguments.get("category", ""), arguments.get("name", ""))
|
|
cleanup_result = await cleanup_extra_packages_async(get_data_dir(), _get_env_yml())
|
|
if cleanup_result.get("removed"):
|
|
result["packages_removed"] = cleanup_result["removed"]
|
|
parts = [f"success: {result['success']}"]
|
|
for k in ("category", "name", "revision", "packages_removed", "error"):
|
|
if result.get(k):
|
|
parts.append(f"{k}: {result[k]}")
|
|
content_out = [TextContent(type="text", text="\n".join(parts))]
|
|
if result.get("success"):
|
|
sync = _workspace_sync_content(workspace_store, arguments.get("category", ""))
|
|
if sync:
|
|
content_out.append(sync)
|
|
return content_out
|
|
elif name == "CondaSync":
|
|
return await sync_packages_async(
|
|
data_dir=get_data_dir(),
|
|
environment_yml=_get_env_yml()
|
|
)
|
|
elif name == "CondaInstall":
|
|
return await install_packages_async(_coerce_json_arg(arguments.get("packages"), "array") or [])
|
|
elif name == "ExecuteResearch":
|
|
result = await category_manager.execute_research(name=arguments.get("name", ""))
|
|
if "error" in result:
|
|
logging.error(f"ExecuteResearch '{arguments.get('name')}': {result['error']}")
|
|
return [TextContent(type="text", text=f"Error: {result['error']}")]
|
|
content = result.get("content", [TextContent(type="text", text="No output")])
|
|
image_count = sum(1 for item in content if item.type == "image")
|
|
logging.info(f"ExecuteResearch '{arguments.get('name')}': returning {len(content)} items, {image_count} images")
|
|
exec_text = "\n".join(
|
|
item.text for item in content
|
|
if getattr(item, "type", "") == "text"
|
|
)
|
|
if exec_text.strip():
|
|
_update_research_summary(DATA_DIR, arguments.get("name", ""), "", exec_text)
|
|
return content
|
|
elif name == "EvaluateIndicator":
|
|
return await evaluate_indicator(
|
|
symbol=arguments.get("symbol", ""),
|
|
from_time=arguments.get("from_time"),
|
|
to_time=arguments.get("to_time"),
|
|
period_seconds=int(arguments.get("period_seconds", 3600)),
|
|
pandas_ta_name=arguments.get("pandas_ta_name", ""),
|
|
parameters=_coerce_json_arg(arguments.get("parameters"), "object") or {},
|
|
)
|
|
elif name == "BacktestStrategy":
|
|
result = await backtest_strategy(
|
|
strategy_name=arguments.get("strategy_name", ""),
|
|
feeds=_coerce_json_arg(arguments.get("feeds"), "array") or [],
|
|
from_time=arguments.get("from_time"),
|
|
to_time=arguments.get("to_time"),
|
|
initial_capital=float(arguments.get("initial_capital", 10_000.0)),
|
|
paper=bool(arguments.get("paper", True)),
|
|
)
|
|
# Persist backtest to DB (non-fatal)
|
|
try:
|
|
payload = json.loads(result[0].text) if result and isinstance(result[0], TextContent) else {}
|
|
if payload and "summary" in payload:
|
|
from dexorder.strategy.db import get_strategy_db
|
|
db = get_strategy_db(get_data_dir())
|
|
await db.insert_backtest(
|
|
strategy_name=arguments.get("strategy_name", ""),
|
|
from_time=arguments.get("from_time"),
|
|
to_time=arguments.get("to_time"),
|
|
initial_capital=float(arguments.get("initial_capital", 10_000.0)),
|
|
feeds=_coerce_json_arg(arguments.get("feeds"), "array") or [],
|
|
summary=payload.get("summary", {}),
|
|
statistics=payload.get("statistics", {}),
|
|
trades=payload.get("trades", []),
|
|
equity_curve=payload.get("equity_curve", []),
|
|
)
|
|
except Exception as _e:
|
|
logging.debug("Failed to persist backtest results: %s", _e)
|
|
return result
|
|
elif name == "ActivateStrategy":
|
|
return await activate_strategy(
|
|
strategy_name=arguments.get("strategy_name", ""),
|
|
feeds=_coerce_json_arg(arguments.get("feeds"), "array") or [],
|
|
allocation=float(arguments.get("allocation", 0.0)),
|
|
paper=bool(arguments.get("paper", True)),
|
|
)
|
|
elif name == "DeactivateStrategy":
|
|
return await deactivate_strategy(
|
|
strategy_name=arguments.get("strategy_name", ""),
|
|
)
|
|
elif name == "ListActiveStrategies":
|
|
return await list_active_strategies()
|
|
elif name == "GetBacktestResults":
|
|
from dexorder.strategy.db import get_strategy_db
|
|
db = get_strategy_db(get_data_dir())
|
|
results = await db.get_backtests(
|
|
strategy_name=arguments.get("strategy_name", ""),
|
|
limit=int(arguments.get("limit", 5)),
|
|
)
|
|
return [TextContent(type="text", text=json.dumps({"backtest_runs": results}))]
|
|
elif name == "GetStrategyTrades":
|
|
from dexorder.strategy.db import get_strategy_db
|
|
db = get_strategy_db(get_data_dir())
|
|
trades = await db.get_trades(
|
|
strategy_name=arguments.get("strategy_name", ""),
|
|
limit=int(arguments.get("limit", 100)),
|
|
)
|
|
return [TextContent(type="text", text=json.dumps({"trades": trades}))]
|
|
elif name == "GetStrategyEvents":
|
|
from dexorder.strategy.db import get_strategy_db
|
|
db = get_strategy_db(get_data_dir())
|
|
events = await db.get_events(
|
|
strategy_name=arguments.get("strategy_name", ""),
|
|
event_type=arguments.get("event_type"),
|
|
limit=int(arguments.get("limit", 50)),
|
|
)
|
|
return [TextContent(type="text", text=json.dumps({"events": events}))]
|
|
else:
|
|
raise ValueError(f"Unknown tool: {name}")
|
|
|
|
logging.info(f"MCP server '{config.mcp_server_name}' created with workspace and category tools")
|
|
return server
|
|
|
|
|
|
# =============================================================================
|
|
# Streamable HTTP Transport Setup
|
|
# =============================================================================
|
|
|
|
def create_streamable_http_app(mcp_server: Server) -> Starlette:
|
|
"""Create Starlette app with Streamable HTTP endpoint for MCP"""
|
|
|
|
session_manager = StreamableHTTPSessionManager(app=mcp_server)
|
|
|
|
@contextlib.asynccontextmanager
|
|
async def lifespan(app: Starlette):
|
|
from dexorder.event_loop import install_thread_safe_asyncio_run
|
|
install_thread_safe_asyncio_run(asyncio.get_running_loop())
|
|
async with session_manager.run():
|
|
yield
|
|
|
|
async def handle_health(request: Request) -> Response:
|
|
"""Health check endpoint for k8s probes and gateway readiness checks"""
|
|
return Response(
|
|
content='{"status":"ok"}',
|
|
media_type="application/json"
|
|
)
|
|
|
|
app = Starlette(
|
|
lifespan=lifespan,
|
|
routes=[
|
|
Mount("/mcp", app=session_manager.handle_request),
|
|
Route("/health", handle_health),
|
|
]
|
|
)
|
|
|
|
return app
|
|
|
|
|
|
# =============================================================================
|
|
# Main Application
|
|
# =============================================================================
|
|
|
|
class UserContainer:
|
|
"""Main user container application"""
|
|
|
|
def __init__(self):
|
|
self.config = Config()
|
|
self.event_publisher: Optional[EventPublisher] = None
|
|
self.mcp_server: Optional[Server] = None
|
|
self.data_api: Optional[DataAPIImpl] = None
|
|
self.event_bridge: Optional[StrategyEventBridge] = None
|
|
self.running = False
|
|
self._uvicorn_server: Optional["uvicorn.Server"] = None
|
|
|
|
async def start(self) -> None:
|
|
"""Start all subsystems"""
|
|
logging.info("Starting user container")
|
|
|
|
# Load configuration
|
|
self.config.load()
|
|
|
|
# Python-level memory guard (RLIMIT_AS soft limit) — DISABLED.
|
|
# We assume nodes have ample memory (8Gi limits) and will revisit a
|
|
# proper RSS-based cgroup monitor later. The implementation is in
|
|
# dexorder/memory_guard.py if we want to re-enable.
|
|
# from dexorder.memory_guard import setup_memory_limit
|
|
# mem_cfg = self.config.config_data.get("memory", {})
|
|
# setup_memory_limit(fraction=float(mem_cfg.get("limit_fraction", 0.85)))
|
|
|
|
# Initialize data and charting API
|
|
data_cfg = self.config.config_data.get("data", {})
|
|
iceberg_cfg = data_cfg.get("iceberg", {})
|
|
relay_cfg = data_cfg.get("relay", {})
|
|
secrets = self.config.secrets_data
|
|
s3_cfg = iceberg_cfg # S3 settings co-located with iceberg config
|
|
|
|
self.data_api = DataAPIImpl(
|
|
iceberg_catalog_uri=iceberg_cfg.get("catalog_uri", "http://iceberg-catalog:8181"),
|
|
relay_endpoint=relay_cfg.get("endpoint", "tcp://relay:5559"),
|
|
notification_endpoint=relay_cfg.get("notification_endpoint", "tcp://relay:5558"),
|
|
namespace=iceberg_cfg.get("namespace", "trading"),
|
|
s3_endpoint=s3_cfg.get("s3_endpoint") or secrets.get("s3_endpoint"),
|
|
s3_access_key=s3_cfg.get("s3_access_key") or secrets.get("s3_access_key"),
|
|
s3_secret_key=s3_cfg.get("s3_secret_key") or secrets.get("s3_secret_key"),
|
|
s3_region=s3_cfg.get("s3_region") or secrets.get("s3_region"),
|
|
)
|
|
await self.data_api.start()
|
|
set_api(API(charting=ChartingAPIImpl(), data=self.data_api))
|
|
logging.info("API initialized")
|
|
|
|
# Start lifecycle manager
|
|
await start_lifecycle_manager(
|
|
user_id=self.config.user_id,
|
|
idle_timeout_minutes=self.config.idle_timeout_minutes,
|
|
enable_idle_shutdown=self.config.enable_idle_shutdown,
|
|
)
|
|
logging.info("Lifecycle manager started")
|
|
|
|
# Start event publisher
|
|
self.event_publisher = EventPublisher(
|
|
user_id=self.config.user_id,
|
|
xpub_port=self.config.zmq_xpub_port,
|
|
gateway_router_endpoint=self.config.zmq_gateway_endpoint,
|
|
)
|
|
await self.event_publisher.start()
|
|
logging.info("Event publisher started")
|
|
|
|
# Publish CONTAINER_STARTING event
|
|
await self.event_publisher.publish(UserEvent(
|
|
event_type=EventType.CONTAINER_STARTING,
|
|
payload={
|
|
"user_id": self.config.user_id,
|
|
"timestamp": None, # Will be auto-filled
|
|
},
|
|
delivery=DeliverySpec.active_or_telegram(),
|
|
))
|
|
|
|
# Initialize strategy lifecycle manager (sets up DB + worktrees dir)
|
|
strategy_lifecycle = get_strategy_lifecycle(self.config.data_dir)
|
|
await strategy_lifecycle.initialize()
|
|
|
|
# Start strategy event bridge (PULL socket for subprocess events)
|
|
self.event_bridge = StrategyEventBridge(
|
|
event_publisher=self.event_publisher,
|
|
strategy_lifecycle=strategy_lifecycle,
|
|
)
|
|
await self.event_bridge.start()
|
|
strategy_lifecycle._bridge = self.event_bridge
|
|
strategy_lifecycle._lifecycle = get_lifecycle_manager()
|
|
logging.info("Strategy event bridge started")
|
|
|
|
# Resume any strategies that were running before container restart
|
|
await strategy_lifecycle.resume_running()
|
|
|
|
# Create MCP server
|
|
self.mcp_server = create_mcp_server(self.config, self.event_publisher)
|
|
|
|
# Publish CONTAINER_READY event
|
|
await self.event_publisher.publish(UserEvent(
|
|
event_type=EventType.CONTAINER_READY,
|
|
payload={
|
|
"user_id": self.config.user_id,
|
|
},
|
|
delivery=DeliverySpec.active_or_telegram(),
|
|
))
|
|
|
|
self.running = True
|
|
logging.info("User container ready")
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop all subsystems"""
|
|
if not self.running:
|
|
return
|
|
|
|
logging.info("Stopping user container")
|
|
self.running = False
|
|
|
|
# Publish CONTAINER_SHUTTING_DOWN event
|
|
if self.event_publisher:
|
|
await self.event_publisher.publish(UserEvent(
|
|
event_type=EventType.CONTAINER_SHUTTING_DOWN,
|
|
payload={
|
|
"user_id": self.config.user_id,
|
|
},
|
|
delivery=DeliverySpec.active_or_telegram(),
|
|
))
|
|
|
|
# Stop running strategies gracefully
|
|
try:
|
|
from dexorder.strategy.lifecycle import get_strategy_lifecycle
|
|
strategy_lifecycle = get_strategy_lifecycle()
|
|
await strategy_lifecycle.shutdown()
|
|
logging.info("Strategy lifecycle manager stopped")
|
|
except Exception as e:
|
|
logging.warning("Error stopping strategy lifecycle: %s", e)
|
|
|
|
# Stop event bridge
|
|
if self.event_bridge:
|
|
await self.event_bridge.stop()
|
|
logging.info("Strategy event bridge stopped")
|
|
|
|
# Stop subsystems
|
|
if self.data_api:
|
|
await self.data_api.stop()
|
|
logging.info("Data API stopped")
|
|
|
|
if self.event_publisher:
|
|
await self.event_publisher.stop()
|
|
logging.info("Event publisher stopped")
|
|
|
|
lifecycle = get_lifecycle_manager()
|
|
if lifecycle:
|
|
await lifecycle.stop()
|
|
logging.info("Lifecycle manager stopped")
|
|
|
|
logging.info("User container stopped")
|
|
|
|
async def run(self) -> None:
|
|
"""Run the MCP server with configured transport"""
|
|
await self.start()
|
|
|
|
try:
|
|
if self.config.mcp_transport == "stdio":
|
|
# Run MCP server on stdio (for dev/testing)
|
|
logging.info("Starting MCP server with stdio transport")
|
|
async with stdio_server() as (read_stream, write_stream):
|
|
await self.mcp_server.run(
|
|
read_stream,
|
|
write_stream,
|
|
self.mcp_server.create_initialization_options()
|
|
)
|
|
elif self.config.mcp_transport == "sse":
|
|
# Run MCP server via Streamable HTTP (for production)
|
|
logging.info(f"Starting MCP server with Streamable HTTP transport on {self.config.mcp_http_host}:{self.config.mcp_http_port}")
|
|
app = create_streamable_http_app(self.mcp_server)
|
|
config = uvicorn.Config(
|
|
app,
|
|
host=self.config.mcp_http_host,
|
|
port=self.config.mcp_http_port,
|
|
log_level=os.getenv("LOG_LEVEL", "info").lower(),
|
|
access_log=True,
|
|
)
|
|
server = uvicorn.Server(config)
|
|
self._uvicorn_server = server
|
|
await server.serve()
|
|
else:
|
|
raise ValueError(f"Unknown MCP transport: {self.config.mcp_transport}")
|
|
finally:
|
|
await self.stop()
|
|
|
|
|
|
# =============================================================================
|
|
# Entry Point
|
|
# =============================================================================
|
|
|
|
async def main():
|
|
"""Main entry point"""
|
|
# Setup logging
|
|
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
|
|
logging.basicConfig(
|
|
level=getattr(logging, log_level),
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
stream=sys.stderr, # MCP uses stdout for protocol
|
|
)
|
|
|
|
# Create and run container
|
|
container = UserContainer()
|
|
|
|
# Handle shutdown signals
|
|
loop = asyncio.get_event_loop()
|
|
|
|
def handle_signal(sig):
|
|
logging.info(f"Received signal {sig}, initiating graceful shutdown...")
|
|
if container._uvicorn_server is not None:
|
|
container._uvicorn_server.should_exit = True
|
|
else:
|
|
loop.stop()
|
|
|
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
|
loop.add_signal_handler(sig, lambda s=sig: handle_signal(s))
|
|
|
|
try:
|
|
await container.run()
|
|
except KeyboardInterrupt:
|
|
logging.info("Keyboard interrupt received")
|
|
except Exception as e:
|
|
logging.error(f"Fatal error: {e}", exc_info=True)
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|