#!/usr/bin/env python3 """ DexOrder User Container Main Entry Point Brings together: - Config and secrets loading from k8s mounted YAML files - ZeroMQ event publisher for user events - MCP server with minimal "hello world" resource - Lifecycle management integration """ import asyncio import logging import os import signal import sys from pathlib import Path from typing import Optional import yaml import uvicorn from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.server.sse import SseServerTransport from starlette.applications import Starlette from starlette.routing import Route, Mount from starlette.requests import Request from starlette.responses import Response from sse_starlette import EventSourceResponse from dexorder import EventPublisher, start_lifecycle_manager, get_lifecycle_manager from dexorder.events import EventType, UserEvent, DeliverySpec from dexorder.api.workspace_tools import get_workspace_store from dexorder.api.category_tools import get_category_manager # ============================================================================= # Global Data Directory # ============================================================================= # Default data directory (relative to working directory for local dev) DEFAULT_DATA_DIR = Path("data") # Global data directory - set after config is loaded DATA_DIR: Path = DEFAULT_DATA_DIR def get_data_dir() -> Path: """Get the global data directory.""" return DATA_DIR # ============================================================================= # Configuration # ============================================================================= class Config: """Application configuration loaded from config.yaml and secrets.yaml""" def __init__(self): # User ID (required) self.user_id: str = os.getenv("USER_ID", "") if not self.user_id: raise ValueError("USER_ID environment variable required") # Config and secrets paths (k8s mounted) self.config_path = Path(os.getenv("CONFIG_PATH", "/app/config/config.yaml")) self.secrets_path = Path(os.getenv("SECRETS_PATH", "/app/config/secrets.yaml")) # ZMQ ports for event system self.zmq_xpub_port: int = int(os.getenv("ZMQ_XPUB_PORT", "5570")) self.zmq_gateway_endpoint: str = os.getenv( "ZMQ_GATEWAY_ENDPOINT", "tcp://gateway:5571" ) # MCP server settings self.mcp_server_name: str = os.getenv("MCP_SERVER_NAME", "dexorder-user") self.mcp_transport: str = os.getenv("MCP_TRANSPORT", "sse") # "stdio" or "sse" self.mcp_http_port: int = int(os.getenv("MCP_HTTP_PORT", "3000")) self.mcp_http_host: str = os.getenv("MCP_HTTP_HOST", "0.0.0.0") # Lifecycle settings self.idle_timeout_minutes: int = int(os.getenv("IDLE_TIMEOUT_MINUTES", "15")) self.enable_idle_shutdown: bool = os.getenv("ENABLE_IDLE_SHUTDOWN", "true").lower() == "true" # Loaded from files self.config_data: dict = {} self.secrets_data: dict = {} # Data directory (set after config load) self.data_dir: Path = DEFAULT_DATA_DIR def load(self) -> None: """Load configuration and secrets from YAML files""" global DATA_DIR # Load config.yaml if exists if self.config_path.exists(): with open(self.config_path) as f: self.config_data = yaml.safe_load(f) or {} logging.info(f"Loaded config from {self.config_path}") else: logging.warning(f"Config file not found: {self.config_path}") # Load secrets.yaml if exists if self.secrets_path.exists(): with open(self.secrets_path) as f: self.secrets_data = yaml.safe_load(f) or {} logging.info(f"Loaded secrets from {self.secrets_path}") else: logging.warning(f"Secrets file not found: {self.secrets_path}") # Set data directory from config or environment # Priority: env var > config file > default data_dir_str = os.getenv("DATA_DIR") or self.config_data.get("data_dir") if data_dir_str: self.data_dir = Path(data_dir_str) else: self.data_dir = DEFAULT_DATA_DIR # Update global DATA_DIR DATA_DIR = self.data_dir # Ensure data directory exists self.data_dir.mkdir(parents=True, exist_ok=True) logging.info(f"Data directory: {self.data_dir}") @property def workspace_dir(self) -> Path: """Workspace directory under DATA_DIR.""" return self.data_dir / "workspace" # ============================================================================= # MCP Server Setup # ============================================================================= def create_mcp_server(config: Config, event_publisher: EventPublisher) -> Server: """Create MCP server with resources and workspace tools""" server = Server(config.mcp_server_name) # Initialize workspace store workspace_store = get_workspace_store(config.workspace_dir) logging.info(f"Workspace store initialized at {config.workspace_dir}") # Initialize category file manager category_manager = get_category_manager(config.data_dir) logging.info(f"Category manager initialized at {config.data_dir}") @server.list_resources() async def list_resources(): """List available resources""" return [ { "uri": f"dexorder://user/{config.user_id}/hello", "name": "Hello World", "description": "A simple hello world resource", "mimeType": "text/plain", "annotations": { "agent_accessible": True, # Available to agent for ad-hoc queries } } ] @server.read_resource() async def read_resource(uri: str): """Read a resource by URI""" if uri == f"dexorder://user/{config.user_id}/hello": # Publish an event when resource is accessed await event_publisher.publish(UserEvent( event_type=EventType.STRATEGY_LOG, payload={ "message": "Hello world resource accessed", "uri": uri, }, delivery=DeliverySpec.informational(), )) return { "uri": uri, "mimeType": "text/plain", "text": f"Hello from DexOrder user container!\nUser ID: {config.user_id}\n", } else: raise ValueError(f"Unknown resource: {uri}") @server.list_tools() async def list_tools(): """List available tools including workspace and category tools""" return [ { "name": "workspace_read", "description": "Read a workspace store from persistent storage", "inputSchema": { "type": "object", "properties": { "store_name": { "type": "string", "description": "Name of the store (e.g., 'chartStore', 'userPreferences')" } }, "required": ["store_name"] }, "annotations": { "agent_accessible": True, # Agent can read workspace stores } }, { "name": "workspace_write", "description": "Write a workspace store to persistent storage", "inputSchema": { "type": "object", "properties": { "store_name": { "type": "string", "description": "Name of the store" }, "data": { "description": "Data to write" } }, "required": ["store_name", "data"] }, "annotations": { "agent_accessible": True, # Agent can write workspace stores } }, { "name": "workspace_patch", "description": "Apply JSON patch operations to a workspace store", "inputSchema": { "type": "object", "properties": { "store_name": { "type": "string", "description": "Name of the store" }, "patch": { "type": "array", "description": "JSON Patch operations (RFC 6902)", "items": { "type": "object", "properties": { "op": {"type": "string", "enum": ["add", "remove", "replace", "move", "copy", "test"]}, "path": {"type": "string"}, "value": {} }, "required": ["op", "path"] } } }, "required": ["store_name", "patch"] }, "annotations": { "agent_accessible": True, # Agent can patch workspace stores } }, { "name": "category_write", "description": "Write a new strategy, indicator, or research script with validation", "inputSchema": { "type": "object", "properties": { "category": { "type": "string", "enum": ["strategy", "indicator", "research"], "description": "Category of the script" }, "name": { "type": "string", "description": "Display name (can contain special characters)" }, "description": { "type": "string", "description": "LLM-generated description of what this does (required)" }, "code": { "type": "string", "description": "Python implementation code" }, "metadata": { "type": "object", "description": "Optional category-specific metadata (e.g., default_length for indicators, data_feeds for strategies)" } }, "required": ["category", "name", "description", "code"] }, "annotations": { "agent_accessible": True, } }, { "name": "category_edit", "description": "Edit an existing category script (updates code, description, or metadata)", "inputSchema": { "type": "object", "properties": { "category": { "type": "string", "enum": ["strategy", "indicator", "research"], "description": "Category of the script" }, "name": { "type": "string", "description": "Display name of the existing item" }, "code": { "type": "string", "description": "Updated Python code (optional, omit to keep existing)" }, "description": { "type": "string", "description": "Updated description (optional, omit to keep existing)" }, "metadata": { "type": "object", "description": "Updated metadata fields (optional)" } }, "required": ["category", "name"] }, "annotations": { "agent_accessible": True, } }, { "name": "category_read", "description": "Read a category script and its metadata", "inputSchema": { "type": "object", "properties": { "category": { "type": "string", "enum": ["strategy", "indicator", "research"], "description": "Category of the script" }, "name": { "type": "string", "description": "Display name of the item" } }, "required": ["category", "name"] }, "annotations": { "agent_accessible": True, } }, { "name": "category_list", "description": "List all items in a category with names and descriptions", "inputSchema": { "type": "object", "properties": { "category": { "type": "string", "enum": ["strategy", "indicator", "research"], "description": "Category to list" } }, "required": ["category"] }, "annotations": { "agent_accessible": True, } } ] @server.call_tool() async def handle_tool_call(name: str, arguments: dict): """Handle tool calls including workspace and category tools""" if name == "workspace_read": return workspace_store.read(arguments.get("store_name", "")) elif name == "workspace_write": return workspace_store.write( arguments.get("store_name", ""), arguments.get("data") ) elif name == "workspace_patch": return workspace_store.patch( arguments.get("store_name", ""), arguments.get("patch", []) ) elif name == "category_write": return category_manager.write( category=arguments.get("category", ""), name=arguments.get("name", ""), description=arguments.get("description", ""), code=arguments.get("code", ""), metadata=arguments.get("metadata") ) elif name == "category_edit": return category_manager.edit( category=arguments.get("category", ""), name=arguments.get("name", ""), code=arguments.get("code"), description=arguments.get("description"), metadata=arguments.get("metadata") ) elif name == "category_read": return category_manager.read( category=arguments.get("category", ""), name=arguments.get("name", "") ) elif name == "category_list": return category_manager.list_items( category=arguments.get("category", "") ) else: raise ValueError(f"Unknown tool: {name}") logging.info(f"MCP server '{config.mcp_server_name}' created with workspace and category tools") return server # ============================================================================= # SSE Transport Setup # ============================================================================= def create_sse_app(mcp_server: Server) -> Starlette: """Create Starlette app with SSE endpoint for MCP""" # Create SSE transport instance sse = SseServerTransport("/messages/") async def handle_sse(request: Request) -> Response: """Handle SSE connections for MCP""" async with sse.connect_sse( request.scope, request.receive, request._send ) as streams: await mcp_server.run( streams[0], streams[1], mcp_server.create_initialization_options() ) return Response() async def handle_health(request: Request) -> Response: """Health check endpoint for k8s probes and gateway readiness checks""" return Response( content='{"status":"ok"}', media_type="application/json" ) app = Starlette( routes=[ Route("/sse", handle_sse), Mount("/messages/", app=sse.handle_post_message), Route("/health", handle_health), ] ) return app # ============================================================================= # Main Application # ============================================================================= class UserContainer: """Main user container application""" def __init__(self): self.config = Config() self.event_publisher: Optional[EventPublisher] = None self.mcp_server: Optional[Server] = None self.running = False async def start(self) -> None: """Start all subsystems""" logging.info("Starting user container") # Load configuration self.config.load() # Start lifecycle manager await start_lifecycle_manager( user_id=self.config.user_id, idle_timeout_minutes=self.config.idle_timeout_minutes, enable_idle_shutdown=self.config.enable_idle_shutdown, ) logging.info("Lifecycle manager started") # Start event publisher self.event_publisher = EventPublisher( user_id=self.config.user_id, xpub_port=self.config.zmq_xpub_port, gateway_router_endpoint=self.config.zmq_gateway_endpoint, ) await self.event_publisher.start() logging.info("Event publisher started") # Publish CONTAINER_STARTING event await self.event_publisher.publish(UserEvent( event_type=EventType.CONTAINER_STARTING, payload={ "user_id": self.config.user_id, "timestamp": None, # Will be auto-filled }, delivery=DeliverySpec.active_or_telegram(), )) # Create MCP server self.mcp_server = create_mcp_server(self.config, self.event_publisher) # Publish CONTAINER_READY event await self.event_publisher.publish(UserEvent( event_type=EventType.CONTAINER_READY, payload={ "user_id": self.config.user_id, }, delivery=DeliverySpec.active_or_telegram(), )) self.running = True logging.info("User container ready") async def stop(self) -> None: """Stop all subsystems""" if not self.running: return logging.info("Stopping user container") self.running = False # Publish CONTAINER_SHUTTING_DOWN event if self.event_publisher: await self.event_publisher.publish(UserEvent( event_type=EventType.CONTAINER_SHUTTING_DOWN, payload={ "user_id": self.config.user_id, }, delivery=DeliverySpec.active_or_telegram(), )) # Stop subsystems if self.event_publisher: await self.event_publisher.stop() logging.info("Event publisher stopped") lifecycle = get_lifecycle_manager() if lifecycle: await lifecycle.stop() logging.info("Lifecycle manager stopped") logging.info("User container stopped") async def run(self) -> None: """Run the MCP server with configured transport""" await self.start() try: if self.config.mcp_transport == "stdio": # Run MCP server on stdio (for dev/testing) logging.info("Starting MCP server with stdio transport") async with stdio_server() as (read_stream, write_stream): await self.mcp_server.run( read_stream, write_stream, self.mcp_server.create_initialization_options() ) elif self.config.mcp_transport == "sse": # Run MCP server via HTTP/SSE (for production) logging.info(f"Starting MCP server with SSE transport on {self.config.mcp_http_host}:{self.config.mcp_http_port}") app = create_sse_app(self.mcp_server) config = uvicorn.Config( app, host=self.config.mcp_http_host, port=self.config.mcp_http_port, log_level=os.getenv("LOG_LEVEL", "info").lower(), access_log=True, ) server = uvicorn.Server(config) await server.serve() else: raise ValueError(f"Unknown MCP transport: {self.config.mcp_transport}") finally: await self.stop() # ============================================================================= # Entry Point # ============================================================================= async def main(): """Main entry point""" # Setup logging log_level = os.getenv("LOG_LEVEL", "INFO").upper() logging.basicConfig( level=getattr(logging, log_level), format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", stream=sys.stderr, # MCP uses stdout for protocol ) # Create and run container container = UserContainer() # Handle shutdown signals loop = asyncio.get_event_loop() def handle_signal(sig): logging.info(f"Received signal {sig}, shutting down...") asyncio.create_task(container.stop()) loop.stop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, lambda s=sig: handle_signal(s)) try: await container.run() except KeyboardInterrupt: logging.info("Keyboard interrupt received") except Exception as e: logging.error(f"Fatal error: {e}", exc_info=True) sys.exit(1) if __name__ == "__main__": asyncio.run(main())