Files
ai/sandbox/main.py
2026-04-09 17:00:43 -04:00

1089 lines
47 KiB
Python

#!/usr/bin/env python3
"""
Dexorder User Container Main Entry Point
Brings together:
- Config and secrets loading from k8s mounted YAML files
- ZeroMQ event publisher for user events
- MCP server with minimal "hello world" resource
- Lifecycle management integration
"""
import asyncio
import contextlib
import logging
import os
import signal
import sys
import time
from pathlib import Path
from typing import Optional
import uvicorn
import yaml
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from mcp.types import Tool, TextContent, ImageContent
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import Response
from starlette.routing import Route, Mount
from dexorder import EventPublisher, start_lifecycle_manager, get_lifecycle_manager
from dexorder.api import set_api, API
from dexorder.conda_manager import sync_packages, install_packages
from dexorder.events import EventType, UserEvent, DeliverySpec
from dexorder.impl.charting_api_impl import ChartingAPIImpl
from dexorder.impl.data_api_impl import DataAPIImpl
from dexorder.tools.python_tools import get_category_manager, sanitize_name
from dexorder.tools.workspace_tools import get_workspace_store
from dexorder.tools.evaluate_indicator import evaluate_indicator
from dexorder.tools.backtest_strategy import backtest_strategy
from dexorder.tools.activate_strategy import activate_strategy, deactivate_strategy, list_active_strategies
# =============================================================================
# Global Data Directory
# =============================================================================
# Default data directory (relative to working directory for local dev)
DEFAULT_DATA_DIR = Path("data")
# Global data directory - set after config is loaded
DATA_DIR: Path = DEFAULT_DATA_DIR
def get_data_dir() -> Path:
"""Get the global data directory."""
return DATA_DIR
# =============================================================================
# Indicator Types Helpers
# =============================================================================
def _build_indicator_type_entry(meta: dict) -> dict:
"""Build an indicator_types workspace entry from indicator metadata dict."""
name = meta.get('name', '')
pandas_ta_name = f"custom_{sanitize_name(name).lower()}"
now = int(time.time())
return {
'pandas_ta_name': pandas_ta_name,
'display_name': name,
'description': meta.get('description', ''),
'metadata': {
'display_name': name,
'parameters': meta.get('parameters') or {},
'input_series': meta.get('input_series') or ['close'],
'output_columns': meta.get('output_columns') or [{'name': 'value'}],
'pane': meta.get('pane', 'separate'),
'filled_areas': meta.get('filled_areas') or [],
'bands': meta.get('bands') or [],
},
'created_at': now,
'modified_at': now,
}
def _upsert_indicator_type(workspace_store, category_manager, name: str) -> None:
"""Read indicator metadata from disk and upsert into indicator_types workspace store."""
read_result = category_manager.read('indicator', name)
if not read_result.get('exists') or not read_result.get('metadata'):
return
meta = read_result['metadata']
entry = _build_indicator_type_entry(meta)
pandas_ta_name = entry['pandas_ta_name']
# Preserve original created_at if already present
existing = workspace_store.read('indicator_types')
existing_types = (existing.get('data') or {}).get('types') or {}
if pandas_ta_name in existing_types:
entry['created_at'] = existing_types[pandas_ta_name].get('created_at', entry['created_at'])
workspace_store.patch('indicator_types', [
{'op': 'add', 'path': f'/types/{pandas_ta_name}', 'value': entry}
])
logging.info(f"Upserted indicator_types/{pandas_ta_name} for '{name}'")
def _populate_indicator_types_from_disk(workspace_store, category_manager) -> None:
"""Scan existing indicators and add any missing entries to indicator_types store."""
existing = workspace_store.read('indicator_types')
existing_types = (existing.get('data') or {}).get('types') or {}
list_result = category_manager.list_items('indicator')
items = list_result.get('items', [])
added = 0
for item in items:
item_name = item.get('name', '')
if not item_name:
continue
pandas_ta_name = f"custom_{sanitize_name(item_name).lower()}"
if pandas_ta_name not in existing_types:
_upsert_indicator_type(workspace_store, category_manager, item_name)
added += 1
if added > 0:
logging.info(f"Populated {added} indicator type(s) from disk into indicator_types store")
# =============================================================================
# Configuration
# =============================================================================
class Config:
"""Application configuration loaded from config.yaml and secrets.yaml"""
def __init__(self):
# User ID (required)
self.user_id: str = os.getenv("USER_ID", "")
if not self.user_id:
raise ValueError("USER_ID environment variable required")
# Config and secrets paths (k8s mounted)
self.config_path = Path(os.getenv("CONFIG_PATH", "/app/config/config.yaml"))
self.secrets_path = Path(os.getenv("SECRETS_PATH", "/app/config/secrets.yaml"))
# ZMQ ports for event system
self.zmq_xpub_port: int = int(os.getenv("ZMQ_XPUB_PORT", "5570"))
self.zmq_gateway_endpoint: str = os.getenv(
"ZMQ_GATEWAY_ENDPOINT",
"tcp://gateway:5571"
)
# MCP server settings
self.mcp_server_name: str = os.getenv("MCP_SERVER_NAME", "dexorder-user")
self.mcp_transport: str = os.getenv("MCP_TRANSPORT", "sse") # "stdio" or "sse"
self.mcp_http_port: int = int(os.getenv("MCP_HTTP_PORT", "3000"))
self.mcp_http_host: str = os.getenv("MCP_HTTP_HOST", "0.0.0.0")
# Lifecycle settings
self.idle_timeout_minutes: int = int(os.getenv("IDLE_TIMEOUT_MINUTES", "15"))
self.enable_idle_shutdown: bool = os.getenv("ENABLE_IDLE_SHUTDOWN", "true").lower() == "true"
# Loaded from files
self.config_data: dict = {}
self.secrets_data: dict = {}
# Data directory (set after config load)
self.data_dir: Path = DEFAULT_DATA_DIR
def load(self) -> None:
"""Load configuration and secrets from YAML files"""
global DATA_DIR
# Load config.yaml if exists
if self.config_path.exists():
with open(self.config_path) as f:
self.config_data = yaml.safe_load(f) or {}
logging.info(f"Loaded config from {self.config_path}")
else:
logging.warning(f"Config file not found: {self.config_path}")
# Load secrets.yaml if exists
if self.secrets_path.exists():
with open(self.secrets_path) as f:
self.secrets_data = yaml.safe_load(f) or {}
logging.info(f"Loaded secrets from {self.secrets_path}")
else:
logging.warning(f"Secrets file not found: {self.secrets_path}")
# Set data directory from config or environment
# Priority: env var > config file > default
data_dir_str = os.getenv("DATA_DIR") or self.config_data.get("data_dir")
if data_dir_str:
self.data_dir = Path(data_dir_str)
else:
self.data_dir = DEFAULT_DATA_DIR
# Update global DATA_DIR
DATA_DIR = self.data_dir
# Ensure data directory exists
self.data_dir.mkdir(parents=True, exist_ok=True)
logging.info(f"Data directory: {self.data_dir}")
@property
def workspace_dir(self) -> Path:
"""Workspace directory under DATA_DIR."""
return self.data_dir / "workspace"
# =============================================================================
# MCP Server Setup
# =============================================================================
def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server:
"""Create MCP server with resources and workspace tools"""
server = Server(config.mcp_server_name)
# Initialize workspace store
workspace_store = get_workspace_store(config.workspace_dir)
logging.info(f"Workspace store initialized at {config.workspace_dir}")
# Initialize category file manager
category_manager = get_category_manager(config.data_dir)
logging.info(f"Category manager initialized at {config.data_dir}")
# Populate indicator_types store from existing indicators on disk (migration/startup sync)
_populate_indicator_types_from_disk(workspace_store, category_manager)
@server.list_resources()
async def list_resources():
"""List available resources"""
return [
{
"uri": f"dexorder://user/{config.user_id}/hello",
"name": "Hello World",
"description": "A simple hello world resource",
"mimeType": "text/plain",
}
]
@server.read_resource()
async def read_resource(uri: str):
"""Read a resource by URI"""
if uri == f"dexorder://user/{config.user_id}/hello":
# Publish an event when resource is accessed
await event_publisher.publish(UserEvent(
event_type=EventType.STRATEGY_LOG,
payload={
"message": "Hello world resource accessed",
"uri": uri,
},
delivery=DeliverySpec.informational(),
))
return {
"uri": uri,
"mimeType": "text/plain",
"text": f"Hello from Dexorder user container!\nUser ID: {config.user_id}\n",
}
else:
raise ValueError(f"Unknown resource: {uri}")
@server.list_tools()
async def list_tools():
"""List available tools including workspace and category tools"""
return [
Tool(
name="workspace_read",
description="Read a workspace store from persistent storage",
inputSchema={
"type": "object",
"properties": {
"store_name": {
"type": "string",
"description": "Name of the store (e.g., 'chartStore', 'userPreferences')"
}
},
"required": ["store_name"]
}
),
Tool(
name="workspace_write",
description="Write a workspace store to persistent storage",
inputSchema={
"type": "object",
"properties": {
"store_name": {
"type": "string",
"description": "Name of the store"
},
"data": {
"description": "Data to write"
}
},
"required": ["store_name", "data"]
}
),
Tool(
name="workspace_patch",
description="Apply JSON patch operations to a workspace store",
inputSchema={
"type": "object",
"properties": {
"store_name": {
"type": "string",
"description": "Name of the store"
},
"patch": {
"type": "array",
"description": "JSON Patch operations (RFC 6902)",
"items": {
"type": "object",
"properties": {
"op": {"type": "string", "enum": ["add", "remove", "replace", "move", "copy", "test"]},
"path": {"type": "string"},
"value": {}
},
"required": ["op", "path"]
}
}
},
"required": ["store_name", "patch"]
}
),
Tool(
name="python_write",
description="Write a new strategy, indicator, or research script with validation",
inputSchema={
"type": "object",
"properties": {
"category": {
"type": "string",
"enum": ["strategy", "indicator", "research"],
"description": "Category of the script"
},
"name": {
"type": "string",
"description": "Display name (can contain special characters)"
},
"description": {
"type": "string",
"description": "LLM-generated description of what this does (required)"
},
"code": {
"type": "string",
"description": "Python implementation code"
},
"metadata": {
"type": "object",
"description": (
"Optional category-specific metadata. "
"For strategy: include 'data_feeds' (list of {symbol, period_seconds, description}) "
"and 'parameters' (object mapping param_name → {default, description}). "
"Example: {\"data_feeds\": [{\"symbol\": \"BTC/USDT.BINANCE\", \"period_seconds\": 3600, \"description\": \"Primary BTC/USDT hourly feed\"}], "
"\"parameters\": {\"rsi_length\": {\"default\": 14, \"description\": \"RSI lookback period\"}, \"threshold\": {\"default\": 70, \"description\": \"Overbought level\"}}}. "
"For indicator: include 'default_length' (int). "
"For any category: 'conda_packages' (list of package names) if extra dependencies are needed."
)
}
},
"required": ["category", "name", "description", "code"]
}
),
Tool(
name="python_edit",
description=(
"Edit an existing category script. "
"Use 'patches' for targeted string replacements (preferred for small changes), "
"or 'code' to replace the full implementation. Do not supply both."
),
inputSchema={
"type": "object",
"properties": {
"category": {
"type": "string",
"enum": ["strategy", "indicator", "research"],
"description": "Category of the script"
},
"name": {
"type": "string",
"description": "Display name of the existing item"
},
"code": {
"type": "string",
"description": "Full replacement Python code. Use only when rewriting the entire implementation; prefer 'patches' for targeted edits."
},
"patches": {
"type": "array",
"description": (
"Targeted code edits as old/new string pairs. Preferred over 'code' for small changes. "
"Each patch: {\"old_string\": \"exact text to find\", \"new_string\": \"replacement text\"}. "
"old_string must be unique in the file (add surrounding context if needed). "
"Patches are applied in order."
),
"items": {
"type": "object",
"properties": {
"old_string": {"type": "string"},
"new_string": {"type": "string"}
},
"required": ["old_string", "new_string"]
}
},
"description": {
"type": "string",
"description": "Updated description (optional, omit to keep existing)"
},
"metadata": {
"type": "object",
"description": (
"Updated metadata fields (optional). "
"For strategy: 'data_feeds' (list of {symbol, period_seconds, description}) "
"and/or 'parameters' (object mapping param_name → {default, description}). "
"For indicator: 'default_length' (int). "
"For any category: 'conda_packages' (list of package names)."
)
}
},
"required": ["category", "name"]
}
),
Tool(
name="python_read",
description="Read a category script and its metadata",
inputSchema={
"type": "object",
"properties": {
"category": {
"type": "string",
"enum": ["strategy", "indicator", "research"],
"description": "Category of the script"
},
"name": {
"type": "string",
"description": "Display name of the item"
}
},
"required": ["category", "name"]
}
),
Tool(
name="python_list",
description="List all items in a category with names and descriptions",
inputSchema={
"type": "object",
"properties": {
"category": {
"type": "string",
"enum": ["strategy", "indicator", "research"],
"description": "Category to list"
}
},
"required": ["category"]
}
),
Tool(
name="python_log",
description="Show git commit history for category items. Filter by category and/or name to see history for a specific item.",
inputSchema={
"type": "object",
"properties": {
"category": {
"type": "string",
"enum": ["strategy", "indicator", "research"],
"description": "Filter to this category (optional)"
},
"name": {
"type": "string",
"description": "Filter to this item (optional, requires category)"
},
"limit": {
"type": "integer",
"description": "Max commits to return (default 20)",
"default": 20
}
},
"required": []
}
),
Tool(
name="python_revert",
description="Restore a category item to a previous git revision. Creates a new commit — non-destructive.",
inputSchema={
"type": "object",
"properties": {
"revision": {
"type": "string",
"description": "Git commit hash (full or short) to restore to"
},
"category": {
"type": "string",
"enum": ["strategy", "indicator", "research"],
"description": "Category of the item"
},
"name": {
"type": "string",
"description": "Display name of the item to restore"
}
},
"required": ["revision", "category", "name"]
}
),
Tool(
name="conda_sync",
description="Sync conda packages: scan all metadata, remove unused packages (excluding base environment)",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="conda_install",
description="Install conda packages on-demand",
inputSchema={
"type": "object",
"properties": {
"packages": {
"type": "array",
"items": {"type": "string"},
"description": "List of conda package names to install"
}
},
"required": ["packages"]
}
),
Tool(
name="execute_research",
description="Execute a research script and return results with matplotlib images",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Display name of the research script to execute"
}
},
"required": ["name"]
}
),
Tool(
name="evaluate_indicator",
description=(
"Evaluate a pandas-ta indicator against real OHLC data and return a structured "
"array of timestamped values. Use this to validate that an indicator computes "
"correctly before adding it to the workspace, or to inspect its output values."
),
inputSchema={
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "Market symbol in 'MARKET.EXCHANGE' format, e.g. 'BTC/USDT.BINANCE'"
},
"from_time": {
"description": "Start of time range. Unix timestamp (int) or date string e.g. '30 days ago', '2024-01-01'"
},
"to_time": {
"description": "End of time range. Unix timestamp (int) or date string e.g. 'now', '2024-03-01'"
},
"period_seconds": {
"type": "integer",
"description": "Candle period in seconds (e.g. 3600 for 1h, 900 for 15m, 86400 for 1d)",
"default": 3600
},
"pandas_ta_name": {
"type": "string",
"description": "Lowercase pandas-ta function name, e.g. 'rsi', 'macd', 'bbands'"
},
"parameters": {
"type": "object",
"description": "pandas-ta keyword arguments, e.g. {\"length\": 14} or {\"fast\": 12, \"slow\": 26, \"signal\": 9}",
"default": {}
}
},
"required": ["symbol", "from_time", "to_time", "pandas_ta_name"]
}
),
Tool(
name="backtest_strategy",
description=(
"Run a saved trading strategy against historical OHLC data using Nautilus Trader "
"BacktestEngine. Returns performance metrics (total return, Sharpe ratio, "
"max drawdown, win rate, trade count) and a full equity curve. "
"Supports multiple data feeds and includes order-flow fields (buy_vol, sell_vol, "
"open_interest) in the strategy's DataFrame."
),
inputSchema={
"type": "object",
"properties": {
"strategy_name": {
"type": "string",
"description": "Display name of the strategy as saved via python_write"
},
"feeds": {
"type": "array",
"description": "Data feeds to backtest against",
"items": {
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "Market symbol in 'MARKET.EXCHANGE' format, e.g. 'BTC/USDT.BINANCE'"
},
"period_seconds": {
"type": "integer",
"description": "Candle period in seconds (e.g. 3600 for 1h)",
"default": 3600
}
},
"required": ["symbol"]
},
"minItems": 1
},
"from_time": {
"description": "Backtest start. Unix timestamp or date string e.g. '2024-01-01', '90 days ago'"
},
"to_time": {
"description": "Backtest end. Unix timestamp or date string e.g. '2025-01-01', 'now'"
},
"initial_capital": {
"type": "number",
"description": "Starting capital in quote currency (e.g. 10000.0 USDT)",
"default": 10000.0
},
"paper": {
"type": "boolean",
"description": "Always true for historical backtest (reserved for forward testing)",
"default": True
}
},
"required": ["strategy_name", "feeds", "from_time", "to_time"]
}
),
Tool(
name="activate_strategy",
description=(
"Activate a strategy for paper or live forward trading with a capital allocation. "
"paper=true (default): simulated fills on live data — no API keys required. "
"paper=false: real execution via user secrets vault (not yet implemented). "
"Note: live data streaming is TBD; this registers the strategy for when it becomes available."
),
inputSchema={
"type": "object",
"properties": {
"strategy_name": {
"type": "string",
"description": "Display name of the strategy as saved via python_write"
},
"feeds": {
"type": "array",
"description": "Data feeds for the strategy",
"items": {
"type": "object",
"properties": {
"symbol": {"type": "string"},
"period_seconds": {"type": "integer", "default": 3600}
},
"required": ["symbol"]
},
"minItems": 1
},
"allocation": {
"type": "number",
"description": "Capital allocated in quote currency (e.g. 5000.0 USDT)"
},
"paper": {
"type": "boolean",
"description": "True = paper/simulated (default); False = live execution",
"default": True
}
},
"required": ["strategy_name", "feeds", "allocation"]
}
),
Tool(
name="deactivate_strategy",
description="Stop an active strategy and return its final P&L summary.",
inputSchema={
"type": "object",
"properties": {
"strategy_name": {
"type": "string",
"description": "Display name of the active strategy to stop"
}
},
"required": ["strategy_name"]
}
),
Tool(
name="list_active_strategies",
description="List all currently active (live or paper) strategies and their status.",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
]
@server.call_tool()
async def handle_tool_call(name: str, arguments: dict):
"""Handle tool calls including workspace and category tools"""
get_lifecycle_manager().record_activity()
try:
return await _handle_tool_call_inner(name, arguments)
except Exception:
logging.exception("Unhandled exception in tool '%s'", name)
raise
async def _handle_tool_call_inner(name: str, arguments: dict):
if name == "workspace_read":
return workspace_store.read(arguments.get("store_name", ""))
elif name == "workspace_write":
return workspace_store.write(
arguments.get("store_name", ""),
arguments.get("data")
)
elif name == "workspace_patch":
return workspace_store.patch(
arguments.get("store_name", ""),
arguments.get("patch", [])
)
elif name == "python_write":
result = category_manager.write(
category=arguments.get("category", ""),
name=arguments.get("name", ""),
description=arguments.get("description", ""),
code=arguments.get("code", ""),
metadata=arguments.get("metadata")
)
content = []
meta_parts = [f"success: {result['success']}", f"path: {result['path']}"]
if result.get("revision"):
meta_parts.append(f"revision: {result['revision']}")
if result.get("validation") and not result["validation"].get("success"):
meta_parts.append(f"validation errors: {result['validation'].get('errors', [])}")
content.append(TextContent(type="text", text="\n".join(meta_parts)))
if result.get("execution"):
exec_content = result["execution"].get("content", [])
content.extend(exec_content)
image_count = sum(1 for item in exec_content if item.type == "image")
logging.info(f"python_write '{arguments.get('name')}': returning {len(content)} items, {image_count} images")
else:
logging.info(f"python_write '{arguments.get('name')}': no execution result (category={arguments.get('category')})")
if result.get("success") and arguments.get("category") == "indicator":
_upsert_indicator_type(workspace_store, category_manager, arguments.get("name", ""))
return content
elif name == "python_edit":
result = category_manager.edit(
category=arguments.get("category", ""),
name=arguments.get("name", ""),
code=arguments.get("code"),
patches=arguments.get("patches"),
description=arguments.get("description"),
metadata=arguments.get("metadata")
)
content = []
meta_parts = [f"success: {result['success']}", f"path: {result['path']}"]
if result.get("revision"):
meta_parts.append(f"revision: {result['revision']}")
if result.get("validation") and not result["validation"].get("success"):
meta_parts.append(f"validation errors: {result['validation'].get('errors', [])}")
content.append(TextContent(type="text", text="\n".join(meta_parts)))
if result.get("execution"):
exec_content = result["execution"].get("content", [])
content.extend(exec_content)
image_count = sum(1 for item in exec_content if item.type == "image")
logging.info(f"python_edit '{arguments.get('name')}': returning {len(content)} items, {image_count} images")
else:
logging.info(f"python_edit '{arguments.get('name')}': no execution result")
if result.get("success") and arguments.get("category") == "indicator":
_upsert_indicator_type(workspace_store, category_manager, arguments.get("name", ""))
return content
elif name == "python_read":
return category_manager.read(
category=arguments.get("category", ""),
name=arguments.get("name", "")
)
elif name == "python_list":
return category_manager.list_items(
category=arguments.get("category", "")
)
elif name == "python_log":
result = category_manager.git_log(
category=arguments.get("category"),
name=arguments.get("name"),
limit=int(arguments.get("limit", 20))
)
lines = [f"success: {result['success']}"]
for c in result.get("commits", []):
lines.append(f"{c['short_hash']} {c['date'][:10]} {c['message']}")
return [TextContent(type="text", text="\n".join(lines))]
elif name == "python_revert":
result = category_manager.git_revert(
revision=arguments.get("revision", ""),
category=arguments.get("category", ""),
name=arguments.get("name", "")
)
meta_parts = [f"success: {result['success']}"]
if result.get("revision"):
meta_parts.append(f"revision: {result['revision']}")
if result.get("error"):
meta_parts.append(f"error: {result['error']}")
if result.get("validation") and not result["validation"].get("success"):
meta_parts.append(f"validation errors: {result['validation'].get('errors', [])}")
return [TextContent(type="text", text="\n".join(meta_parts))]
elif name == "conda_sync":
# Get environment.yml path relative to main.py
env_yml = Path(__file__).parent / "environment.yml"
return sync_packages(
data_dir=get_data_dir(),
environment_yml=env_yml if env_yml.exists() else None
)
elif name == "conda_install":
return install_packages(arguments.get("packages", []))
elif name == "execute_research":
result = category_manager.execute_research(name=arguments.get("name", ""))
if "error" in result:
logging.error(f"execute_research '{arguments.get('name')}': {result['error']}")
return [TextContent(type="text", text=f"Error: {result['error']}")]
content = result.get("content", [TextContent(type="text", text="No output")])
image_count = sum(1 for item in content if item.type == "image")
logging.info(f"execute_research '{arguments.get('name')}': returning {len(content)} items, {image_count} images")
return content
elif name == "evaluate_indicator":
return await evaluate_indicator(
symbol=arguments.get("symbol", ""),
from_time=arguments.get("from_time"),
to_time=arguments.get("to_time"),
period_seconds=int(arguments.get("period_seconds", 3600)),
pandas_ta_name=arguments.get("pandas_ta_name", ""),
parameters=arguments.get("parameters") or {},
)
elif name == "backtest_strategy":
return await backtest_strategy(
strategy_name=arguments.get("strategy_name", ""),
feeds=arguments.get("feeds", []),
from_time=arguments.get("from_time"),
to_time=arguments.get("to_time"),
initial_capital=float(arguments.get("initial_capital", 10_000.0)),
paper=bool(arguments.get("paper", True)),
)
elif name == "activate_strategy":
return await activate_strategy(
strategy_name=arguments.get("strategy_name", ""),
feeds=arguments.get("feeds", []),
allocation=float(arguments.get("allocation", 0.0)),
paper=bool(arguments.get("paper", True)),
)
elif name == "deactivate_strategy":
return await deactivate_strategy(
strategy_name=arguments.get("strategy_name", ""),
)
elif name == "list_active_strategies":
return await list_active_strategies()
else:
raise ValueError(f"Unknown tool: {name}")
logging.info(f"MCP server '{config.mcp_server_name}' created with workspace and category tools")
return server
# =============================================================================
# Streamable HTTP Transport Setup
# =============================================================================
def create_streamable_http_app(mcp_server: Server) -> Starlette:
"""Create Starlette app with Streamable HTTP endpoint for MCP"""
session_manager = StreamableHTTPSessionManager(app=mcp_server)
@contextlib.asynccontextmanager
async def lifespan(app: Starlette):
async with session_manager.run():
yield
async def handle_health(request: Request) -> Response:
"""Health check endpoint for k8s probes and gateway readiness checks"""
return Response(
content='{"status":"ok"}',
media_type="application/json"
)
app = Starlette(
lifespan=lifespan,
routes=[
Mount("/mcp", app=session_manager.handle_request),
Route("/health", handle_health),
]
)
return app
# =============================================================================
# Main Application
# =============================================================================
class UserContainer:
"""Main user container application"""
def __init__(self):
self.config = Config()
self.event_publisher: Optional[EventPublisher] = None
self.mcp_server: Optional[Server] = None
self.data_api: Optional[DataAPIImpl] = None
self.running = False
async def start(self) -> None:
"""Start all subsystems"""
logging.info("Starting user container")
# Load configuration
self.config.load()
# Initialize data and charting API
data_cfg = self.config.config_data.get("data", {})
iceberg_cfg = data_cfg.get("iceberg", {})
relay_cfg = data_cfg.get("relay", {})
secrets = self.config.secrets_data
s3_cfg = iceberg_cfg # S3 settings co-located with iceberg config
self.data_api = DataAPIImpl(
iceberg_catalog_uri=iceberg_cfg.get("catalog_uri", "http://iceberg-catalog:8181"),
relay_endpoint=relay_cfg.get("endpoint", "tcp://relay:5559"),
notification_endpoint=relay_cfg.get("notification_endpoint", "tcp://relay:5558"),
namespace=iceberg_cfg.get("namespace", "trading"),
s3_endpoint=s3_cfg.get("s3_endpoint") or secrets.get("s3_endpoint"),
s3_access_key=s3_cfg.get("s3_access_key") or secrets.get("s3_access_key"),
s3_secret_key=s3_cfg.get("s3_secret_key") or secrets.get("s3_secret_key"),
)
await self.data_api.start()
set_api(API(charting=ChartingAPIImpl(), data=self.data_api))
logging.info("API initialized")
# Start lifecycle manager
await start_lifecycle_manager(
user_id=self.config.user_id,
idle_timeout_minutes=self.config.idle_timeout_minutes,
enable_idle_shutdown=self.config.enable_idle_shutdown,
)
logging.info("Lifecycle manager started")
# Start event publisher
self.event_publisher = EventPublisher(
user_id=self.config.user_id,
xpub_port=self.config.zmq_xpub_port,
gateway_router_endpoint=self.config.zmq_gateway_endpoint,
)
await self.event_publisher.start()
logging.info("Event publisher started")
# Publish CONTAINER_STARTING event
await self.event_publisher.publish(UserEvent(
event_type=EventType.CONTAINER_STARTING,
payload={
"user_id": self.config.user_id,
"timestamp": None, # Will be auto-filled
},
delivery=DeliverySpec.active_or_telegram(),
))
# Create MCP server
self.mcp_server = create_mcp_server(self.config, self.event_publisher)
# Publish CONTAINER_READY event
await self.event_publisher.publish(UserEvent(
event_type=EventType.CONTAINER_READY,
payload={
"user_id": self.config.user_id,
},
delivery=DeliverySpec.active_or_telegram(),
))
self.running = True
logging.info("User container ready")
async def stop(self) -> None:
"""Stop all subsystems"""
if not self.running:
return
logging.info("Stopping user container")
self.running = False
# Publish CONTAINER_SHUTTING_DOWN event
if self.event_publisher:
await self.event_publisher.publish(UserEvent(
event_type=EventType.CONTAINER_SHUTTING_DOWN,
payload={
"user_id": self.config.user_id,
},
delivery=DeliverySpec.active_or_telegram(),
))
# Stop subsystems
if self.data_api:
await self.data_api.stop()
logging.info("Data API stopped")
if self.event_publisher:
await self.event_publisher.stop()
logging.info("Event publisher stopped")
lifecycle = get_lifecycle_manager()
if lifecycle:
await lifecycle.stop()
logging.info("Lifecycle manager stopped")
logging.info("User container stopped")
async def run(self) -> None:
"""Run the MCP server with configured transport"""
await self.start()
try:
if self.config.mcp_transport == "stdio":
# Run MCP server on stdio (for dev/testing)
logging.info("Starting MCP server with stdio transport")
async with stdio_server() as (read_stream, write_stream):
await self.mcp_server.run(
read_stream,
write_stream,
self.mcp_server.create_initialization_options()
)
elif self.config.mcp_transport == "sse":
# Run MCP server via Streamable HTTP (for production)
logging.info(f"Starting MCP server with Streamable HTTP transport on {self.config.mcp_http_host}:{self.config.mcp_http_port}")
app = create_streamable_http_app(self.mcp_server)
config = uvicorn.Config(
app,
host=self.config.mcp_http_host,
port=self.config.mcp_http_port,
log_level=os.getenv("LOG_LEVEL", "info").lower(),
access_log=True,
)
server = uvicorn.Server(config)
await server.serve()
else:
raise ValueError(f"Unknown MCP transport: {self.config.mcp_transport}")
finally:
await self.stop()
# =============================================================================
# Entry Point
# =============================================================================
async def main():
"""Main entry point"""
# Setup logging
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=getattr(logging, log_level),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
stream=sys.stderr, # MCP uses stdout for protocol
)
# Create and run container
container = UserContainer()
# Handle shutdown signals
loop = asyncio.get_event_loop()
def handle_signal(sig):
logging.info(f"Received signal {sig}, shutting down...")
asyncio.create_task(container.stop())
loop.stop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, lambda s=sig: handle_signal(s))
try:
await container.run()
except KeyboardInterrupt:
logging.info("Keyboard interrupt received")
except Exception as e:
logging.error(f"Fatal error: {e}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())