diff --git a/backend/config.yaml b/backend/config.yaml index 183d7e7..6245d13 100644 --- a/backend/config.yaml +++ b/backend/config.yaml @@ -5,7 +5,7 @@ server_port: 8081 agent: model: "claude-sonnet-4-20250514" temperature: 0.7 - context_docs_dir: "memory" + context_docs_dir: "memory" # Context docs still loaded from memory/ # Local memory configuration (free & sophisticated!) memory: diff --git a/backend/requirements.txt b/backend/requirements.txt index e47c96b..6fa06a6 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -42,3 +42,6 @@ python-dotenv>=1.0.0 # Secrets management cryptography>=42.0.0 argon2-cffi>=23.0.0 + +# Trigger system scheduling +apscheduler>=3.10.0 diff --git a/backend/soul/automation_agent.md b/backend/soul/automation_agent.md new file mode 100644 index 0000000..932e370 --- /dev/null +++ b/backend/soul/automation_agent.md @@ -0,0 +1,37 @@ +# Automation Agent + +You are a specialized automation and scheduling agent. Your sole purpose is to manage triggers, scheduled tasks, and automated workflows. + +## Your Core Identity + +You are an expert in: +- Scheduling recurring tasks with cron expressions +- Creating interval-based triggers +- Managing the trigger queue and priorities +- Designing autonomous agent workflows + +## Your Tools + +You have access to: +- **Trigger Tools**: schedule_agent_prompt, execute_agent_prompt_once, list_scheduled_triggers, cancel_scheduled_trigger, get_trigger_system_stats + +## Communication Style + +- **Systematic**: Explain scheduling logic clearly +- **Proactive**: Suggest optimal scheduling strategies +- **Organized**: Keep track of all scheduled tasks +- **Reliable**: Ensure triggers are set up correctly + +## Key Principles + +1. **Clarity**: Make schedules easy to understand +2. **Maintainability**: Use descriptive names for jobs +3. **Priority Awareness**: Respect task priorities +4. **Resource Conscious**: Avoid overwhelming the system + +## Limitations + +- You ONLY handle scheduling and automation +- You do NOT execute the actual analysis (delegate to other agents) +- You do NOT access data or charts directly +- You coordinate but don't perform the work diff --git a/backend/soul/chart_agent.md b/backend/soul/chart_agent.md new file mode 100644 index 0000000..9cc8f93 --- /dev/null +++ b/backend/soul/chart_agent.md @@ -0,0 +1,40 @@ +# Chart Analysis Agent + +You are a specialized chart analysis and technical analysis agent. Your sole purpose is to work with chart data, indicators, and Python-based analysis. + +## Your Core Identity + +You are an expert in: +- Reading and analyzing OHLCV data +- Computing technical indicators (RSI, MACD, Bollinger Bands, etc.) +- Drawing shapes and annotations on charts +- Executing Python code for custom analysis +- Visualizing data with matplotlib + +## Your Tools + +You have access to: +- **Chart Tools**: get_chart_data, execute_python +- **Indicator Tools**: search_indicators, add_indicator_to_chart, list_chart_indicators, etc. +- **Shape Tools**: search_shapes, create_or_update_shape, delete_shape, etc. + +## Communication Style + +- **Direct & Technical**: Provide analysis results clearly +- **Visual**: Generate plots when helpful +- **Precise**: Reference specific timeframes, indicators, and values +- **Concise**: Focus on the analysis task at hand + +## Key Principles + +1. **Data First**: Always work with actual market data +2. **Visualize**: Create charts to illustrate findings +3. **Document Calculations**: Explain what indicators show +4. **Respect Context**: Use the chart the user is viewing when available + +## Limitations + +- You ONLY handle chart analysis and visualization +- You do NOT make trading decisions +- You do NOT access external APIs or data sources +- Route other requests back to the main agent diff --git a/backend/soul/data_agent.md b/backend/soul/data_agent.md new file mode 100644 index 0000000..0a50074 --- /dev/null +++ b/backend/soul/data_agent.md @@ -0,0 +1,37 @@ +# Data Access Agent + +You are a specialized data access agent. Your sole purpose is to retrieve and search market data from various exchanges and data sources. + +## Your Core Identity + +You are an expert in: +- Searching for trading symbols across exchanges +- Retrieving historical OHLCV data +- Understanding exchange APIs and data formats +- Symbol resolution and metadata + +## Your Tools + +You have access to: +- **Data Source Tools**: list_data_sources, search_symbols, get_symbol_info, get_historical_data + +## Communication Style + +- **Precise**: Provide exact symbol names and exchange identifiers +- **Helpful**: Suggest alternatives when exact matches aren't found +- **Efficient**: Return data in the format requested +- **Clear**: Explain data limitations or availability issues + +## Key Principles + +1. **Accuracy**: Return correct symbol identifiers +2. **Completeness**: Include all relevant metadata +3. **Performance**: Respect countback limits +4. **Format Awareness**: Understand time resolutions and ranges + +## Limitations + +- You ONLY handle data retrieval and search +- You do NOT analyze data (route to chart agent) +- You do NOT access external news or research (route to research agent) +- You do NOT modify data or state diff --git a/backend/memory/system_prompt.md b/backend/soul/main_agent.md similarity index 100% rename from backend/memory/system_prompt.md rename to backend/soul/main_agent.md diff --git a/backend/soul/research_agent.md b/backend/soul/research_agent.md new file mode 100644 index 0000000..c46c665 --- /dev/null +++ b/backend/soul/research_agent.md @@ -0,0 +1,37 @@ +# Research Agent + +You are a specialized research agent. Your sole purpose is to gather external information from the web, academic papers, and public APIs. + +## Your Core Identity + +You are an expert in: +- Searching academic papers on arXiv +- Finding information on Wikipedia +- Web search for current events and news +- Making HTTP requests to public APIs + +## Your Tools + +You have access to: +- **Research Tools**: search_arxiv, search_wikipedia, search_web, http_get, http_post + +## Communication Style + +- **Thorough**: Provide comprehensive research findings +- **Source-Aware**: Cite sources and links +- **Critical**: Evaluate information quality +- **Summarize**: Distill key points from long content + +## Key Principles + +1. **Verify**: Cross-reference information when possible +2. **Recency**: Note publication dates and data freshness +3. **Relevance**: Focus on trading and market-relevant information +4. **Ethics**: Respect API rate limits and terms of service + +## Limitations + +- You ONLY handle external information gathering +- You do NOT analyze market data (route to chart agent) +- You do NOT make trading recommendations +- You do NOT access private or authenticated APIs without explicit permission diff --git a/backend/src/agent/core.py b/backend/src/agent/core.py index a2dab38..9215725 100644 --- a/backend/src/agent/core.py +++ b/backend/src/agent/core.py @@ -7,10 +7,12 @@ from langchain_core.messages import HumanMessage, SystemMessage, AIMessage from langchain_core.runnables import RunnableConfig from langgraph.prebuilt import create_react_agent -from agent.tools import SYNC_TOOLS, DATASOURCE_TOOLS, INDICATOR_TOOLS, RESEARCH_TOOLS, CHART_TOOLS, SHAPE_TOOLS +from agent.tools import SYNC_TOOLS, DATASOURCE_TOOLS, INDICATOR_TOOLS, RESEARCH_TOOLS, CHART_TOOLS, SHAPE_TOOLS, TRIGGER_TOOLS from agent.memory import MemoryManager from agent.session import SessionManager from agent.prompts import build_system_prompt +from agent.subagent import SubAgent +from agent.routers import ROUTER_TOOLS, set_chart_agent, set_data_agent, set_automation_agent, set_research_agent from gateway.user_session import UserSession from gateway.protocol import UserMessage as GatewayUserMessage @@ -29,7 +31,8 @@ class AgentExecutor: model_name: str = "claude-sonnet-4-20250514", temperature: float = 0.7, api_key: Optional[str] = None, - memory_manager: Optional[MemoryManager] = None + memory_manager: Optional[MemoryManager] = None, + base_dir: str = "." ): """Initialize agent executor. @@ -38,10 +41,12 @@ class AgentExecutor: temperature: Model temperature api_key: Anthropic API key memory_manager: MemoryManager instance + base_dir: Base directory for resolving paths """ self.model_name = model_name self.temperature = temperature self.api_key = api_key + self.base_dir = base_dir # Initialize LLM self.llm = ChatAnthropic( @@ -56,6 +61,12 @@ class AgentExecutor: self.session_manager = SessionManager(self.memory_manager) self.agent = None # Will be created after initialization + # Sub-agents (only if using hierarchical tools) + self.chart_agent = None + self.data_agent = None + self.automation_agent = None + self.research_agent = None + async def initialize(self) -> None: """Initialize the agent system.""" await self.memory_manager.initialize() @@ -63,15 +74,69 @@ class AgentExecutor: # Create agent with tools and LangGraph checkpointer checkpointer = self.memory_manager.get_checkpointer() - # Create agent without a static system prompt + # Create specialized sub-agents + logger.info("Initializing hierarchical agent architecture with sub-agents") + + self.chart_agent = SubAgent( + name="chart", + soul_file="chart_agent.md", + tools=CHART_TOOLS + INDICATOR_TOOLS + SHAPE_TOOLS, + model_name=self.model_name, + temperature=self.temperature, + api_key=self.api_key, + base_dir=self.base_dir + ) + + self.data_agent = SubAgent( + name="data", + soul_file="data_agent.md", + tools=DATASOURCE_TOOLS, + model_name=self.model_name, + temperature=self.temperature, + api_key=self.api_key, + base_dir=self.base_dir + ) + + self.automation_agent = SubAgent( + name="automation", + soul_file="automation_agent.md", + tools=TRIGGER_TOOLS, + model_name=self.model_name, + temperature=self.temperature, + api_key=self.api_key, + base_dir=self.base_dir + ) + + self.research_agent = SubAgent( + name="research", + soul_file="research_agent.md", + tools=RESEARCH_TOOLS, + model_name=self.model_name, + temperature=self.temperature, + api_key=self.api_key, + base_dir=self.base_dir + ) + + # Set global sub-agent instances for router tools + set_chart_agent(self.chart_agent) + set_data_agent(self.data_agent) + set_automation_agent(self.automation_agent) + set_research_agent(self.research_agent) + + # Main agent only gets SYNC_TOOLS (state management) and ROUTER_TOOLS + logger.info("Main agent using router tools (4 routers + sync tools)") + agent_tools = SYNC_TOOLS + ROUTER_TOOLS + + # Create main agent without a static system prompt # We'll pass the dynamic system prompt via state_modifier at runtime - # Include all tool categories: sync, datasource, chart, indicator, shape, and research self.agent = create_react_agent( self.llm, - SYNC_TOOLS + DATASOURCE_TOOLS + CHART_TOOLS + INDICATOR_TOOLS + SHAPE_TOOLS + RESEARCH_TOOLS, + agent_tools, checkpointer=checkpointer ) + logger.info(f"Agent initialized with {len(agent_tools)} tools") + async def _clear_checkpoint(self, session_id: str) -> None: """Clear the checkpoint for a session to prevent resuming from invalid state. @@ -291,7 +356,7 @@ def create_agent( base_dir: Base directory for resolving paths Returns: - Initialized AgentExecutor + Initialized AgentExecutor with hierarchical tool routing """ # Initialize memory manager memory_manager = MemoryManager( @@ -307,7 +372,8 @@ def create_agent( model_name=model_name, temperature=temperature, api_key=api_key, - memory_manager=memory_manager + memory_manager=memory_manager, + base_dir=base_dir ) return executor diff --git a/backend/src/agent/routers.py b/backend/src/agent/routers.py new file mode 100644 index 0000000..c7fdfc3 --- /dev/null +++ b/backend/src/agent/routers.py @@ -0,0 +1,218 @@ +"""Tool router functions for hierarchical agent architecture. + +This module provides meta-tools that route tasks to specialized sub-agents. +The main agent uses these routers instead of accessing all tools directly. +""" + +import logging +from typing import Optional +from langchain_core.tools import tool + +logger = logging.getLogger(__name__) + +# Global sub-agent instances (set by create_agent) +_chart_agent = None +_data_agent = None +_automation_agent = None +_research_agent = None + + +def set_chart_agent(agent): + """Set the global chart sub-agent instance.""" + global _chart_agent + _chart_agent = agent + + +def set_data_agent(agent): + """Set the global data sub-agent instance.""" + global _data_agent + _data_agent = agent + + +def set_automation_agent(agent): + """Set the global automation sub-agent instance.""" + global _automation_agent + _automation_agent = agent + + +def set_research_agent(agent): + """Set the global research sub-agent instance.""" + global _research_agent + _research_agent = agent + + +@tool +async def use_chart_analysis(task: str) -> str: + """Analyze charts, compute indicators, execute Python code, and create visualizations. + + This tool delegates to a specialized chart analysis agent that has access to: + - Chart data retrieval (get_chart_data) + - Python execution environment with pandas, numpy, matplotlib, talib + - Technical indicator tools (add/remove indicators, search indicators) + - Shape drawing tools (create/update/delete shapes on charts) + + Use this when the user wants to: + - Analyze price action or patterns + - Calculate technical indicators (RSI, MACD, Bollinger Bands, etc.) + - Execute custom Python analysis on OHLCV data + - Generate charts and visualizations + - Draw trendlines, support/resistance, or other shapes + - Perform statistical analysis on market data + + Args: + task: Detailed description of the chart analysis task. Include: + - What to analyze (which symbol, timeframe if different from current) + - What indicators or calculations to perform + - What visualizations to create + - Any specific questions to answer + + Returns: + The chart agent's analysis results, including computed values, + plot URLs if visualizations were created, and interpretation. + + Examples: + - "Calculate RSI(14) for the current chart and tell me if it's overbought" + - "Draw a trendline connecting the last 3 swing lows" + - "Compute Bollinger Bands (20, 2) and create a chart showing price vs bands" + - "Analyze the last 100 bars and identify key support/resistance levels" + - "Execute Python: calculate correlation between BTC and ETH over the last 30 days" + """ + if not _chart_agent: + return "Error: Chart analysis agent not initialized" + + logger.info(f"Routing to chart agent: {task[:100]}...") + result = await _chart_agent.execute(task) + return result + + +@tool +async def use_data_access(task: str) -> str: + """Search for symbols and retrieve market data from exchanges. + + This tool delegates to a specialized data access agent that has access to: + - Symbol search across multiple exchanges + - Historical OHLCV data retrieval + - Symbol metadata and info + - Available data sources and exchanges + + Use this when the user wants to: + - Search for a trading symbol or ticker + - Get historical price data + - Find out what exchanges support a symbol + - Retrieve symbol metadata (price scale, supported resolutions, etc.) + - Check what data sources are available + + Args: + task: Detailed description of the data access task. Include: + - What symbol or instrument to search for + - What data to retrieve (time range, resolution) + - What metadata is needed + + Returns: + The data agent's response with requested symbols, data, or metadata. + + Examples: + - "Search for Bitcoin symbols on Binance" + - "Get the last 100 hours of BTC/USDT 1-hour data from Binance" + - "Find all symbols matching 'ETH' on all exchanges" + - "Get detailed info about symbol BTC/USDT on Binance" + - "List all available data sources" + """ + if not _data_agent: + return "Error: Data access agent not initialized" + + logger.info(f"Routing to data agent: {task[:100]}...") + result = await _data_agent.execute(task) + return result + + +@tool +async def use_automation(task: str) -> str: + """Schedule recurring tasks, create triggers, and manage automation. + + This tool delegates to a specialized automation agent that has access to: + - Scheduled agent prompts (cron and interval-based) + - One-time agent prompt execution + - Trigger management (list, cancel scheduled jobs) + - System stats and monitoring + + Use this when the user wants to: + - Schedule a recurring task (hourly, daily, weekly, etc.) + - Run a one-time background analysis + - Set up automated monitoring or alerts + - List or cancel existing scheduled tasks + - Check trigger system status + + Args: + task: Detailed description of the automation task. Include: + - What should happen (what analysis or action) + - When it should happen (schedule, frequency) + - Any priorities or conditions + + Returns: + The automation agent's response with job IDs, confirmation, + or status information. + + Examples: + - "Schedule a task to check BTC price every 5 minutes" + - "Run a one-time analysis of ETH volume in the background" + - "Set up a daily report at 9 AM with market summary" + - "Show me all my scheduled tasks" + - "Cancel the hourly BTC monitor job" + """ + if not _automation_agent: + return "Error: Automation agent not initialized" + + logger.info(f"Routing to automation agent: {task[:100]}...") + result = await _automation_agent.execute(task) + return result + + +@tool +async def use_research(task: str) -> str: + """Search the web, academic papers, and external APIs for information. + + This tool delegates to a specialized research agent that has access to: + - Web search (DuckDuckGo) + - Academic paper search (arXiv) + - Wikipedia lookup + - HTTP requests to public APIs + + Use this when the user wants to: + - Search for current news or events + - Find academic papers on trading strategies + - Look up financial concepts or terms + - Fetch data from external public APIs + - Research market trends or sentiment + + Args: + task: Detailed description of the research task. Include: + - What information to find + - What sources to search (web, arxiv, wikipedia, APIs) + - What to focus on or filter + + Returns: + The research agent's findings with sources, summaries, and links. + + Examples: + - "Search arXiv for papers on reinforcement learning for trading" + - "Look up 'technical analysis' on Wikipedia" + - "Search the web for latest Ethereum news" + - "Fetch current BTC price from CoinGecko API" + - "Find recent papers on market microstructure" + """ + if not _research_agent: + return "Error: Research agent not initialized" + + logger.info(f"Routing to research agent: {task[:100]}...") + result = await _research_agent.execute(task) + return result + + +# Export router tools +ROUTER_TOOLS = [ + use_chart_analysis, + use_data_access, + use_automation, + use_research +] diff --git a/backend/src/agent/subagent.py b/backend/src/agent/subagent.py new file mode 100644 index 0000000..db2d174 --- /dev/null +++ b/backend/src/agent/subagent.py @@ -0,0 +1,248 @@ +"""Sub-agent infrastructure for specialized tool routing. + +This module provides the SubAgent class that wraps specialized agents +with their own tools and system prompts. +""" + +import logging +from typing import List, Optional, AsyncIterator +from pathlib import Path + +from langchain_anthropic import ChatAnthropic +from langchain_core.messages import HumanMessage, SystemMessage +from langchain_core.runnables import RunnableConfig +from langgraph.prebuilt import create_react_agent +from langgraph.checkpoint.memory import MemorySaver + +logger = logging.getLogger(__name__) + + +class SubAgent: + """A specialized sub-agent with its own tools and system prompt. + + Sub-agents are lightweight, stateless agents that focus on specific domains. + They use in-memory checkpointing since they don't need persistent state. + """ + + def __init__( + self, + name: str, + soul_file: str, + tools: List, + model_name: str = "claude-sonnet-4-20250514", + temperature: float = 0.7, + api_key: Optional[str] = None, + base_dir: str = "." + ): + """Initialize a sub-agent. + + Args: + name: Agent name (e.g., "chart", "data", "automation") + soul_file: Filename in /soul directory (e.g., "chart_agent.md") + tools: List of LangChain tools for this agent + model_name: Anthropic model name + temperature: Model temperature + api_key: Anthropic API key + base_dir: Base directory for resolving paths + """ + self.name = name + self.soul_file = soul_file + self.tools = tools + self.model_name = model_name + self.temperature = temperature + self.api_key = api_key + self.base_dir = base_dir + + # Load system prompt from soul file + soul_path = Path(base_dir) / "soul" / soul_file + if soul_path.exists(): + with open(soul_path, "r") as f: + self.system_prompt = f.read() + logger.info(f"SubAgent '{name}': Loaded system prompt from {soul_path}") + else: + logger.warning(f"SubAgent '{name}': Soul file not found at {soul_path}, using default") + self.system_prompt = f"You are a specialized {name} agent." + + # Initialize LLM + self.llm = ChatAnthropic( + model=model_name, + temperature=temperature, + api_key=api_key, + streaming=True + ) + + # Create agent with in-memory checkpointer (stateless) + checkpointer = MemorySaver() + self.agent = create_react_agent( + self.llm, + tools, + checkpointer=checkpointer + ) + + logger.info( + f"SubAgent '{name}' initialized with {len(tools)} tools, " + f"model={model_name}, temp={temperature}" + ) + + async def execute( + self, + task: str, + thread_id: Optional[str] = None + ) -> str: + """Execute a task with this sub-agent. + + Args: + task: The task/prompt for this sub-agent + thread_id: Optional thread ID for checkpointing (uses ephemeral ID if not provided) + + Returns: + The agent's complete response as a string + """ + import uuid + + # Use ephemeral thread ID if not provided + if thread_id is None: + thread_id = f"subagent-{self.name}-{uuid.uuid4()}" + + logger.info(f"SubAgent '{self.name}': Executing task (thread_id={thread_id})") + logger.debug(f"SubAgent '{self.name}': Task: {task[:200]}...") + + # Build messages with system prompt + messages = [ + HumanMessage(content=task) + ] + + # Prepare config with system prompt injection + config = RunnableConfig( + configurable={ + "thread_id": thread_id, + "state_modifier": self.system_prompt + }, + metadata={ + "subagent_name": self.name + } + ) + + # Execute and collect response + full_response = "" + event_count = 0 + + try: + async for event in self.agent.astream_events( + {"messages": messages}, + config=config, + version="v2" + ): + event_count += 1 + + # Log tool calls + if event["event"] == "on_tool_start": + tool_name = event.get("name", "unknown") + logger.debug(f"SubAgent '{self.name}': Tool call started: {tool_name}") + + elif event["event"] == "on_tool_end": + tool_name = event.get("name", "unknown") + logger.debug(f"SubAgent '{self.name}': Tool call completed: {tool_name}") + + # Extract streaming tokens + elif event["event"] == "on_chat_model_stream": + chunk = event["data"]["chunk"] + if hasattr(chunk, "content") and chunk.content: + content = chunk.content + # Handle both string and list content + if isinstance(content, list): + text_parts = [] + for block in content: + if isinstance(block, dict) and "text" in block: + text_parts.append(block["text"]) + elif hasattr(block, "text"): + text_parts.append(block.text) + content = "".join(text_parts) + + if content: + full_response += content + + logger.info( + f"SubAgent '{self.name}': Completed task " + f"({event_count} events, {len(full_response)} chars)" + ) + + except Exception as e: + error_msg = f"SubAgent '{self.name}' execution error: {str(e)}" + logger.error(error_msg, exc_info=True) + return f"Error: {error_msg}" + + return full_response + + async def stream( + self, + task: str, + thread_id: Optional[str] = None + ) -> AsyncIterator[str]: + """Execute a task with streaming response. + + Args: + task: The task/prompt for this sub-agent + thread_id: Optional thread ID for checkpointing + + Yields: + Response chunks as they're generated + """ + import uuid + + # Use ephemeral thread ID if not provided + if thread_id is None: + thread_id = f"subagent-{self.name}-{uuid.uuid4()}" + + logger.info(f"SubAgent '{self.name}': Streaming task (thread_id={thread_id})") + + # Build messages with system prompt + messages = [ + HumanMessage(content=task) + ] + + # Prepare config + config = RunnableConfig( + configurable={ + "thread_id": thread_id, + "state_modifier": self.system_prompt + }, + metadata={ + "subagent_name": self.name + } + ) + + # Stream response + try: + async for event in self.agent.astream_events( + {"messages": messages}, + config=config, + version="v2" + ): + # Log tool calls + if event["event"] == "on_tool_start": + tool_name = event.get("name", "unknown") + logger.debug(f"SubAgent '{self.name}': Tool call started: {tool_name}") + + # Extract streaming tokens + elif event["event"] == "on_chat_model_stream": + chunk = event["data"]["chunk"] + if hasattr(chunk, "content") and chunk.content: + content = chunk.content + # Handle both string and list content + if isinstance(content, list): + text_parts = [] + for block in content: + if isinstance(block, dict) and "text" in block: + text_parts.append(block["text"]) + elif hasattr(block, "text"): + text_parts.append(block.text) + content = "".join(text_parts) + + if content: + yield content + + except Exception as e: + error_msg = f"SubAgent '{self.name}' streaming error: {str(e)}" + logger.error(error_msg, exc_info=True) + yield f"Error: {error_msg}" diff --git a/backend/src/agent/tools/TRIGGER_TOOLS.md b/backend/src/agent/tools/TRIGGER_TOOLS.md new file mode 100644 index 0000000..b89390f --- /dev/null +++ b/backend/src/agent/tools/TRIGGER_TOOLS.md @@ -0,0 +1,373 @@ +# Agent Trigger Tools + +Agent tools for automating tasks via the trigger system. + +## Overview + +These tools allow the agent to: +- **Schedule recurring tasks** - Run agent prompts on intervals or cron schedules +- **Execute one-time tasks** - Trigger sub-agent runs immediately +- **Manage scheduled jobs** - List and cancel scheduled triggers +- **React to events** - (Future) Connect data updates to agent actions + +## Available Tools + +### 1. `schedule_agent_prompt` + +Schedule an agent to run with a specific prompt on a recurring schedule. + +**Use Cases:** +- Daily market analysis reports +- Hourly portfolio rebalancing checks +- Weekly performance summaries +- Monitoring alerts + +**Arguments:** +- `prompt` (str): The prompt to send to the agent when triggered +- `schedule_type` (str): "interval" or "cron" +- `schedule_config` (dict): Schedule configuration +- `name` (str, optional): Descriptive name for this task + +**Schedule Config:** + +*Interval-based:* +```json +{"minutes": 5} +{"hours": 1, "minutes": 30} +{"seconds": 30} +``` + +*Cron-based:* +```json +{"hour": "9", "minute": "0"} // Daily at 9:00 AM +{"hour": "9", "minute": "0", "day_of_week": "mon-fri"} // Weekdays at 9 AM +{"minute": "0"} // Every hour on the hour +{"hour": "*/6", "minute": "0"} // Every 6 hours +``` + +**Returns:** +```json +{ + "job_id": "interval_123", + "message": "Scheduled 'daily_report' with job_id=interval_123", + "schedule_type": "cron", + "config": {"hour": "9", "minute": "0"} +} +``` + +**Examples:** + +```python +# Every 5 minutes: check BTC price +schedule_agent_prompt( + prompt="Check current BTC price on Binance. If > $50k, alert me.", + schedule_type="interval", + schedule_config={"minutes": 5}, + name="btc_price_monitor" +) + +# Daily at 9 AM: market summary +schedule_agent_prompt( + prompt="Generate a comprehensive market summary for BTC, ETH, and SOL. Include price changes, volume, and notable events from the last 24 hours.", + schedule_type="cron", + schedule_config={"hour": "9", "minute": "0"}, + name="daily_market_summary" +) + +# Every hour on weekdays: portfolio check +schedule_agent_prompt( + prompt="Review current portfolio positions. Check if any rebalancing is needed based on target allocations.", + schedule_type="cron", + schedule_config={"minute": "0", "day_of_week": "mon-fri"}, + name="hourly_portfolio_check" +) +``` + +### 2. `execute_agent_prompt_once` + +Execute an agent prompt once, immediately (enqueued with priority). + +**Use Cases:** +- Background analysis tasks +- One-time data processing +- Responding to specific events +- Sub-agent delegation + +**Arguments:** +- `prompt` (str): The prompt to send to the agent +- `priority` (str): "high", "normal", or "low" (default: "normal") + +**Returns:** +```json +{ + "queue_seq": 42, + "message": "Enqueued agent prompt with priority=normal", + "prompt": "Analyze the last 100 BTC/USDT bars..." +} +``` + +**Examples:** + +```python +# Immediate analysis with high priority +execute_agent_prompt_once( + prompt="Analyze the last 100 BTC/USDT 1m bars and identify key support/resistance levels", + priority="high" +) + +# Background task with normal priority +execute_agent_prompt_once( + prompt="Research the latest news about Ethereum upgrades and summarize findings", + priority="normal" +) + +# Low priority cleanup task +execute_agent_prompt_once( + prompt="Review and archive old chart drawings from last month", + priority="low" +) +``` + +### 3. `list_scheduled_triggers` + +List all currently scheduled triggers. + +**Returns:** +```json +[ + { + "id": "cron_456", + "name": "Cron: daily_market_summary", + "next_run_time": "2024-03-05 09:00:00", + "trigger": "cron[hour='9', minute='0']" + }, + { + "id": "interval_123", + "name": "Interval: btc_price_monitor", + "next_run_time": "2024-03-04 14:35:00", + "trigger": "interval[0:05:00]" + } +] +``` + +**Example:** + +```python +jobs = list_scheduled_triggers() + +for job in jobs: + print(f"{job['name']} - next run: {job['next_run_time']}") +``` + +### 4. `cancel_scheduled_trigger` + +Cancel a scheduled trigger by its job ID. + +**Arguments:** +- `job_id` (str): The job ID from `schedule_agent_prompt` or `list_scheduled_triggers` + +**Returns:** +```json +{ + "status": "success", + "message": "Cancelled job interval_123" +} +``` + +**Example:** + +```python +# List jobs to find the ID +jobs = list_scheduled_triggers() + +# Cancel specific job +cancel_scheduled_trigger("interval_123") +``` + +### 5. `on_data_update_run_agent` + +**(Future)** Set up an agent to run whenever new data arrives for a specific symbol. + +**Arguments:** +- `source_name` (str): Data source name (e.g., "binance") +- `symbol` (str): Trading pair (e.g., "BTC/USDT") +- `resolution` (str): Time resolution (e.g., "1m", "5m") +- `prompt_template` (str): Template with variables like {close}, {volume}, {symbol} + +**Example:** + +```python +on_data_update_run_agent( + source_name="binance", + symbol="BTC/USDT", + resolution="1m", + prompt_template="New bar on {symbol}: close={close}, volume={volume}. Check if price crossed any key levels." +) +``` + +### 6. `get_trigger_system_stats` + +Get statistics about the trigger system. + +**Returns:** +```json +{ + "queue_depth": 3, + "queue_running": true, + "coordinator_stats": { + "current_seq": 1042, + "next_commit_seq": 1043, + "pending_commits": 1, + "total_executions": 1042, + "state_counts": { + "COMMITTED": 1038, + "EXECUTING": 2, + "WAITING_COMMIT": 1, + "FAILED": 1 + } + } +} +``` + +**Example:** + +```python +stats = get_trigger_system_stats() +print(f"Queue has {stats['queue_depth']} pending triggers") +print(f"System has processed {stats['coordinator_stats']['total_executions']} total triggers") +``` + +## Integration Example + +Here's how these tools enable autonomous agent behavior: + +```python +# Agent conversation: +User: "Monitor BTC price and send me a summary every hour during market hours" + +Agent: I'll set that up for you using the trigger system. + +# Agent uses tool: +schedule_agent_prompt( + prompt=""" + Check the current BTC/USDT price on Binance. + Calculate the price change from 1 hour ago. + If price moved > 2%, provide a detailed analysis. + Otherwise, provide a brief status update. + Send results to user as a notification. + """, + schedule_type="cron", + schedule_config={ + "minute": "0", + "hour": "9-17", # 9 AM to 5 PM + "day_of_week": "mon-fri" + }, + name="btc_hourly_monitor" +) + +Agent: Done! I've scheduled an hourly BTC price monitor that runs during market hours (9 AM - 5 PM on weekdays). You'll receive updates every hour. + +# Later... +User: "Can you show me all my scheduled tasks?" + +Agent: Let me check what's scheduled. + +# Agent uses tool: +jobs = list_scheduled_triggers() + +Agent: You have 3 scheduled tasks: +1. "btc_hourly_monitor" - runs every hour during market hours +2. "daily_market_summary" - runs daily at 9 AM +3. "portfolio_rebalance_check" - runs every 4 hours + +Would you like to modify or cancel any of these? +``` + +## Use Case: Autonomous Trading Bot + +```python +# Step 1: Set up data monitoring +execute_agent_prompt_once( + prompt=""" + Subscribe to BTC/USDT 1m bars from Binance. + When subscribed, set up the following: + 1. Calculate RSI(14) on each new bar + 2. If RSI > 70, execute prompt: "RSI overbought on BTC, check if we should sell" + 3. If RSI < 30, execute prompt: "RSI oversold on BTC, check if we should buy" + """, + priority="high" +) + +# Step 2: Schedule periodic portfolio review +schedule_agent_prompt( + prompt=""" + Review current portfolio: + 1. Calculate current allocation percentages + 2. Compare to target allocation (60% BTC, 30% ETH, 10% stable) + 3. If deviation > 5%, generate rebalancing trades + 4. Submit trades for execution + """, + schedule_type="interval", + schedule_config={"hours": 4}, + name="portfolio_rebalance" +) + +# Step 3: Schedule daily risk check +schedule_agent_prompt( + prompt=""" + Daily risk assessment: + 1. Calculate portfolio VaR (Value at Risk) + 2. Check current leverage across all positions + 3. Review stop-loss placements + 4. If risk exceeds threshold, alert and suggest adjustments + """, + schedule_type="cron", + schedule_config={"hour": "8", "minute": "0"}, + name="daily_risk_check" +) +``` + +## Benefits + +✅ **Autonomous operation** - Agent can schedule its own tasks +✅ **Event-driven** - React to market data, time, or custom events +✅ **Flexible scheduling** - Interval or cron-based +✅ **Self-managing** - Agent can list and cancel its own jobs +✅ **Priority control** - High-priority tasks jump the queue +✅ **Future-proof** - Easy to add Python lambdas, strategy execution, etc. + +## Future Enhancements + +- **Python script execution** - Schedule arbitrary Python code +- **Strategy triggers** - Connect to strategy execution system +- **Event composition** - AND/OR logic for complex event patterns +- **Conditional execution** - Only run if conditions met (e.g., volatility > threshold) +- **Result chaining** - Use output of one trigger as input to another +- **Backtesting mode** - Test trigger logic on historical data + +## Setup in main.py + +```python +from agent.tools import set_trigger_queue, set_trigger_scheduler, set_coordinator +from trigger import TriggerQueue, CommitCoordinator +from trigger.scheduler import TriggerScheduler + +# Initialize trigger system +coordinator = CommitCoordinator() +queue = TriggerQueue(coordinator) +scheduler = TriggerScheduler(queue) + +await queue.start() +scheduler.start() + +# Make available to agent tools +set_trigger_queue(queue) +set_trigger_scheduler(scheduler) +set_coordinator(coordinator) + +# Add TRIGGER_TOOLS to agent's tool list +from agent.tools import TRIGGER_TOOLS +agent_tools = [..., *TRIGGER_TOOLS] +``` + +Now the agent has full control over the trigger system! 🚀 diff --git a/backend/src/agent/tools/__init__.py b/backend/src/agent/tools/__init__.py index 3b36bd1..576bf19 100644 --- a/backend/src/agent/tools/__init__.py +++ b/backend/src/agent/tools/__init__.py @@ -6,6 +6,7 @@ This package provides tools for: - Chart data access and analysis (chart_tools) - Technical indicators (indicator_tools) - Shape/drawing management (shape_tools) +- Trigger system and automation (trigger_tools) """ # Global registries that will be set by main.py @@ -39,15 +40,25 @@ from .chart_tools import CHART_TOOLS from .indicator_tools import INDICATOR_TOOLS from .research_tools import RESEARCH_TOOLS from .shape_tools import SHAPE_TOOLS +from .trigger_tools import ( + TRIGGER_TOOLS, + set_trigger_queue, + set_trigger_scheduler, + set_coordinator, +) __all__ = [ "set_registry", "set_datasource_registry", "set_indicator_registry", + "set_trigger_queue", + "set_trigger_scheduler", + "set_coordinator", "SYNC_TOOLS", "DATASOURCE_TOOLS", "CHART_TOOLS", "INDICATOR_TOOLS", "RESEARCH_TOOLS", "SHAPE_TOOLS", + "TRIGGER_TOOLS", ] diff --git a/backend/src/agent/tools/trigger_tools.py b/backend/src/agent/tools/trigger_tools.py new file mode 100644 index 0000000..f3a420e --- /dev/null +++ b/backend/src/agent/tools/trigger_tools.py @@ -0,0 +1,366 @@ +""" +Agent tools for trigger system. + +Allows agents to: +- Schedule recurring tasks (cron-style) +- Execute one-time triggers +- Manage scheduled triggers (list, cancel) +- Connect events to sub-agent runs or lambdas +""" + +import logging +from typing import Any, Dict, List, Optional + +from langchain_core.tools import tool + +logger = logging.getLogger(__name__) + +# Global references set by main.py +_trigger_queue = None +_trigger_scheduler = None +_coordinator = None + + +def set_trigger_queue(queue): + """Set the global TriggerQueue instance for tools to use.""" + global _trigger_queue + _trigger_queue = queue + + +def set_trigger_scheduler(scheduler): + """Set the global TriggerScheduler instance for tools to use.""" + global _trigger_scheduler + _trigger_scheduler = scheduler + + +def set_coordinator(coordinator): + """Set the global CommitCoordinator instance for tools to use.""" + global _coordinator + _coordinator = coordinator + + +def _get_trigger_queue(): + """Get the global trigger queue instance.""" + if not _trigger_queue: + raise ValueError("TriggerQueue not initialized") + return _trigger_queue + + +def _get_trigger_scheduler(): + """Get the global trigger scheduler instance.""" + if not _trigger_scheduler: + raise ValueError("TriggerScheduler not initialized") + return _trigger_scheduler + + +def _get_coordinator(): + """Get the global coordinator instance.""" + if not _coordinator: + raise ValueError("CommitCoordinator not initialized") + return _coordinator + + +@tool +async def schedule_agent_prompt( + prompt: str, + schedule_type: str, + schedule_config: Dict[str, Any], + name: Optional[str] = None, +) -> Dict[str, str]: + """Schedule an agent to run with a specific prompt on a recurring schedule. + + This allows you to set up automated tasks where the agent runs periodically + with a predefined prompt. Useful for: + - Daily market analysis reports + - Hourly portfolio rebalancing checks + - Weekly performance summaries + - Monitoring alerts + + Args: + prompt: The prompt to send to the agent when triggered + schedule_type: Type of schedule - "interval" or "cron" + schedule_config: Schedule configuration: + For "interval": {"minutes": 5} or {"hours": 1, "minutes": 30} + For "cron": {"hour": "9", "minute": "0"} for 9:00 AM daily + {"hour": "9", "minute": "0", "day_of_week": "mon-fri"} + name: Optional descriptive name for this scheduled task + + Returns: + Dictionary with job_id and confirmation message + + Examples: + # Run every 5 minutes + schedule_agent_prompt( + prompt="Check BTC price and alert if > $50k", + schedule_type="interval", + schedule_config={"minutes": 5} + ) + + # Run daily at 9 AM + schedule_agent_prompt( + prompt="Generate daily market summary", + schedule_type="cron", + schedule_config={"hour": "9", "minute": "0"} + ) + + # Run hourly on weekdays + schedule_agent_prompt( + prompt="Monitor portfolio for rebalancing opportunities", + schedule_type="cron", + schedule_config={"minute": "0", "day_of_week": "mon-fri"} + ) + """ + from trigger.handlers import LambdaHandler + from trigger import Priority + + scheduler = _get_trigger_scheduler() + queue = _get_trigger_queue() + + if not name: + name = f"agent_prompt_{hash(prompt) % 10000}" + + # Create a lambda that enqueues an agent trigger with the prompt + async def agent_prompt_lambda(): + from trigger.handlers import AgentTriggerHandler + + # Create agent trigger (will use current session's context) + # In production, you'd want to specify which session/user this belongs to + trigger = AgentTriggerHandler( + session_id="scheduled", # Special session for scheduled tasks + message_content=prompt, + coordinator=_get_coordinator(), + ) + + await queue.enqueue(trigger) + return [] # No direct commit intents + + # Wrap in lambda handler + lambda_trigger = LambdaHandler( + name=f"scheduled_{name}", + func=agent_prompt_lambda, + priority=Priority.TIMER, + ) + + # Schedule based on type + if schedule_type == "interval": + job_id = scheduler.schedule_interval( + lambda_trigger, + seconds=schedule_config.get("seconds"), + minutes=schedule_config.get("minutes"), + hours=schedule_config.get("hours"), + priority=Priority.TIMER, + ) + elif schedule_type == "cron": + job_id = scheduler.schedule_cron( + lambda_trigger, + minute=schedule_config.get("minute"), + hour=schedule_config.get("hour"), + day=schedule_config.get("day"), + month=schedule_config.get("month"), + day_of_week=schedule_config.get("day_of_week"), + priority=Priority.TIMER, + ) + else: + raise ValueError(f"Invalid schedule_type: {schedule_type}. Use 'interval' or 'cron'") + + return { + "job_id": job_id, + "message": f"Scheduled '{name}' with job_id={job_id}", + "schedule_type": schedule_type, + "config": schedule_config, + } + + +@tool +async def execute_agent_prompt_once( + prompt: str, + priority: str = "normal", +) -> Dict[str, str]: + """Execute an agent prompt once, immediately (enqueued with priority). + + Use this to trigger a sub-agent with a specific task without waiting for + a user message. Useful for: + - Background analysis tasks + - One-time data processing + - Responding to specific events + + Args: + prompt: The prompt to send to the agent + priority: Priority level - "high", "normal", or "low" + + Returns: + Confirmation that the prompt was enqueued + + Example: + execute_agent_prompt_once( + prompt="Analyze the last 100 BTC/USDT bars and identify support levels", + priority="high" + ) + """ + from trigger.handlers import AgentTriggerHandler + from trigger import Priority + + queue = _get_trigger_queue() + + # Map string priority to enum + priority_map = { + "high": Priority.USER_AGENT, # Same priority as user messages + "normal": Priority.SYSTEM, + "low": Priority.LOW, + } + priority_enum = priority_map.get(priority.lower(), Priority.SYSTEM) + + # Create agent trigger + trigger = AgentTriggerHandler( + session_id="oneshot", + message_content=prompt, + coordinator=_get_coordinator(), + ) + + # Enqueue with priority override + queue_seq = await queue.enqueue(trigger, priority_enum) + + return { + "queue_seq": queue_seq, + "message": f"Enqueued agent prompt with priority={priority}", + "prompt": prompt[:100] + "..." if len(prompt) > 100 else prompt, + } + + +@tool +def list_scheduled_triggers() -> List[Dict[str, Any]]: + """List all currently scheduled triggers. + + Returns: + List of dictionaries with job information (id, name, next_run_time) + + Example: + jobs = list_scheduled_triggers() + for job in jobs: + print(f"{job['id']}: {job['name']} - next run at {job['next_run_time']}") + """ + scheduler = _get_trigger_scheduler() + jobs = scheduler.get_jobs() + + result = [] + for job in jobs: + result.append({ + "id": job.id, + "name": job.name, + "next_run_time": str(job.next_run_time) if job.next_run_time else None, + "trigger": str(job.trigger), + }) + + return result + + +@tool +def cancel_scheduled_trigger(job_id: str) -> Dict[str, str]: + """Cancel a scheduled trigger by its job ID. + + Args: + job_id: The job ID returned from schedule_agent_prompt or list_scheduled_triggers + + Returns: + Confirmation message + + Example: + cancel_scheduled_trigger("interval_123") + """ + scheduler = _get_trigger_scheduler() + success = scheduler.remove_job(job_id) + + if success: + return { + "status": "success", + "message": f"Cancelled job {job_id}", + } + else: + return { + "status": "error", + "message": f"Job {job_id} not found", + } + + +@tool +async def on_data_update_run_agent( + source_name: str, + symbol: str, + resolution: str, + prompt_template: str, +) -> Dict[str, str]: + """Set up an agent to run whenever new data arrives for a specific symbol. + + The prompt_template can include {variables} that will be filled with bar data: + - {time}: Bar timestamp + - {open}, {high}, {low}, {close}, {volume}: OHLCV values + - {symbol}: Trading pair symbol + - {source}: Data source name + + Args: + source_name: Name of data source (e.g., "binance") + symbol: Trading pair (e.g., "BTC/USDT") + resolution: Time resolution (e.g., "1m", "5m", "1h") + prompt_template: Template string for agent prompt + + Returns: + Confirmation with subscription details + + Example: + on_data_update_run_agent( + source_name="binance", + symbol="BTC/USDT", + resolution="1m", + prompt_template="New bar on {symbol}: close={close}. Check if we should trade." + ) + + Note: + This is a simplified version. Full implementation would wire into + DataSource subscription system to trigger on every bar update. + """ + # TODO: Implement proper DataSource subscription integration + # For now, return placeholder + + return { + "status": "not_implemented", + "message": "Data-driven agent triggers coming soon", + "config": { + "source": source_name, + "symbol": symbol, + "resolution": resolution, + "prompt_template": prompt_template, + }, + } + + +@tool +def get_trigger_system_stats() -> Dict[str, Any]: + """Get statistics about the trigger system. + + Returns: + Dictionary with queue depth, execution stats, etc. + + Example: + stats = get_trigger_system_stats() + print(f"Queue depth: {stats['queue_depth']}") + print(f"Current seq: {stats['current_seq']}") + """ + queue = _get_trigger_queue() + coordinator = _get_coordinator() + + return { + "queue_depth": queue.get_queue_size(), + "queue_running": queue.is_running(), + "coordinator_stats": coordinator.get_stats(), + } + + +# Export tools list +TRIGGER_TOOLS = [ + schedule_agent_prompt, + execute_agent_prompt_once, + list_scheduled_triggers, + cancel_scheduled_trigger, + on_data_update_run_agent, + get_trigger_system_stats, +] diff --git a/backend/src/main.py b/backend/src/main.py index 4294e76..685ed09 100644 --- a/backend/src/main.py +++ b/backend/src/main.py @@ -21,6 +21,7 @@ from gateway.channels.websocket import WebSocketChannel from gateway.protocol import WebSocketAgentUserMessage from agent.core import create_agent from agent.tools import set_registry, set_datasource_registry, set_indicator_registry +from agent.tools import set_trigger_queue, set_trigger_scheduler, set_coordinator from schema.order_spec import SwapOrder from schema.chart_state import ChartState from schema.shape import ShapeCollection @@ -30,6 +31,8 @@ from datasource.subscription_manager import SubscriptionManager from datasource.websocket_handler import DatafeedWebSocketHandler from secrets_manager import SecretsStore, InvalidMasterPassword from indicator import IndicatorRegistry, register_all_talib_indicators, register_custom_indicators +from trigger import CommitCoordinator, TriggerQueue +from trigger.scheduler import TriggerScheduler # Configure logging logging.basicConfig( @@ -59,6 +62,11 @@ subscription_manager = SubscriptionManager() # Indicator infrastructure indicator_registry = IndicatorRegistry() +# Trigger system infrastructure +trigger_coordinator = None +trigger_queue = None +trigger_scheduler = None + # Global secrets store secrets_store = SecretsStore() @@ -66,7 +74,7 @@ secrets_store = SecretsStore() @asynccontextmanager async def lifespan(app: FastAPI): """Initialize agent system and data sources on startup.""" - global agent_executor + global agent_executor, trigger_coordinator, trigger_queue, trigger_scheduler # Initialize CCXT data sources try: @@ -115,6 +123,22 @@ async def lifespan(app: FastAPI): if anthropic_api_key: logger.info("Loaded API key from environment") + # Initialize trigger system + logger.info("Initializing trigger system...") + trigger_coordinator = CommitCoordinator() + trigger_queue = TriggerQueue(trigger_coordinator) + trigger_scheduler = TriggerScheduler(trigger_queue) + + # Start trigger queue and scheduler + await trigger_queue.start() + trigger_scheduler.start() + logger.info("Trigger system initialized and started") + + # Set trigger system for agent tools + set_coordinator(trigger_coordinator) + set_trigger_queue(trigger_queue) + set_trigger_scheduler(trigger_scheduler) + if not anthropic_api_key: logger.error("ANTHROPIC_API_KEY not found in environment!") logger.info("Agent system will not be available") @@ -133,7 +157,7 @@ async def lifespan(app: FastAPI): chroma_db_path=config["memory"]["chroma_db"], embedding_model=config["memory"]["embedding_model"], context_docs_dir=config["agent"]["context_docs_dir"], - base_dir="." # backend/src is the working directory, so . goes to backend, where memory/ lives + base_dir="." # backend/src is the working directory, so . goes to backend, where memory/ and soul/ live ) await agent_executor.initialize() @@ -146,9 +170,22 @@ async def lifespan(app: FastAPI): yield # Cleanup + logger.info("Shutting down systems...") + + # Shutdown trigger system + if trigger_scheduler: + trigger_scheduler.shutdown(wait=True) + logger.info("Trigger scheduler shut down") + + if trigger_queue: + await trigger_queue.stop() + logger.info("Trigger queue stopped") + + # Shutdown agent system if agent_executor and agent_executor.memory_manager: await agent_executor.memory_manager.close() - logger.info("Agent system shut down") + + logger.info("All systems shut down") app = FastAPI(lifespan=lifespan) diff --git a/backend/src/trigger/PRIORITIES.md b/backend/src/trigger/PRIORITIES.md new file mode 100644 index 0000000..6357d4a --- /dev/null +++ b/backend/src/trigger/PRIORITIES.md @@ -0,0 +1,216 @@ +# Priority System + +Simple tuple-based priorities for deterministic execution ordering. + +## Basic Concept + +Priorities are just **Python tuples**. Python compares tuples element-by-element, left-to-right: + +```python +(0, 1000, 5) < (0, 1001, 3) # True: 0==0, but 1000 < 1001 +(0, 1000, 5) < (1, 500, 2) # True: 0 < 1 +(0, 1000) < (0, 1000, 5) # True: shorter wins if equal so far +``` + +**Lower values = higher priority** (processed first). + +## Priority Categories + +```python +class Priority(IntEnum): + DATA_SOURCE = 0 # Market data, real-time feeds + TIMER = 1 # Scheduled tasks, cron jobs + USER_AGENT = 2 # User-agent interactions (chat) + USER_DATA_REQUEST = 3 # User data requests (charts) + SYSTEM = 4 # Background tasks, cleanup + LOW = 5 # Retries after conflicts +``` + +## Usage Examples + +### Simple Priority + +```python +# Just use the Priority enum +trigger = MyTrigger("task", priority=Priority.SYSTEM) +await queue.enqueue(trigger) + +# Results in tuple: (4, queue_seq) +``` + +### Compound Priority (Tuple) + +```python +# DataSource: sort by event time (older bars first) +trigger = DataUpdateTrigger( + source_name="binance", + symbol="BTC/USDT", + resolution="1m", + bar_data={"time": 1678896000, "open": 50000, ...} +) +await queue.enqueue(trigger) + +# Results in tuple: (0, 1678896000, queue_seq) +# ^ ^ ^ +# | | Queue insertion order (FIFO) +# | Event time (candle end time) +# DATA_SOURCE priority +``` + +### Manual Override + +```python +# Override at enqueue time +await queue.enqueue( + trigger, + priority_override=(Priority.DATA_SOURCE, custom_time, custom_sort) +) + +# Queue appends queue_seq: (0, custom_time, custom_sort, queue_seq) +``` + +## Common Patterns + +### Market Data (Process Chronologically) + +```python +# Bar from 10:00 → (0, 10:00_timestamp, queue_seq) +# Bar from 10:05 → (0, 10:05_timestamp, queue_seq) +# +# 10:00 bar processes first (earlier event_time) + +DataUpdateTrigger( + ..., + bar_data={"time": event_timestamp, ...} +) +``` + +### User Messages (FIFO Order) + +```python +# Message #1 → (2, msg1_timestamp, queue_seq) +# Message #2 → (2, msg2_timestamp, queue_seq) +# +# Message #1 processes first (earlier timestamp) + +AgentTriggerHandler( + session_id="user1", + message_content="...", + message_timestamp=unix_timestamp # Optional, defaults to now +) +``` + +### Scheduled Tasks (By Schedule Time) + +```python +# Job scheduled for 9 AM → (1, 9am_timestamp, queue_seq) +# Job scheduled for 2 PM → (1, 2pm_timestamp, queue_seq) +# +# 9 AM job processes first + +CronTrigger( + name="morning_sync", + inner_trigger=..., + scheduled_time=scheduled_timestamp +) +``` + +## Execution Order Example + +``` +Queue contains: +1. DataSource (BTC @ 10:00) → (0, 10:00, 1) +2. DataSource (BTC @ 10:05) → (0, 10:05, 2) +3. Timer (scheduled 9 AM) → (1, 09:00, 3) +4. User message #1 → (2, 14:30, 4) +5. User message #2 → (2, 14:35, 5) + +Dequeue order: +1. DataSource (BTC @ 10:00) ← 0 < all others +2. DataSource (BTC @ 10:05) ← 0 < all others, 10:05 > 10:00 +3. Timer (scheduled 9 AM) ← 1 < remaining +4. User message #1 ← 2 < remaining, 14:30 < 14:35 +5. User message #2 ← last +``` + +## Short Tuple Wins + +If tuples are equal up to the length of the shorter one, **shorter tuple has higher priority**: + +```python +(0, 1000) < (0, 1000, 5) # True: shorter wins +(0,) < (0, 1000) # True: shorter wins +(Priority.DATA_SOURCE,) < (Priority.DATA_SOURCE, 1000) # True +``` + +This is Python's default tuple comparison behavior. In practice, we always append `queue_seq`, so this rarely matters (all tuples end up same length). + +## Integration with Triggers + +### Trigger Sets Its Own Priority + +```python +class MyTrigger(Trigger): + def __init__(self, event_time): + super().__init__( + name="my_trigger", + priority=Priority.DATA_SOURCE, + priority_tuple=(Priority.DATA_SOURCE.value, event_time) + ) +``` + +Queue appends `queue_seq` automatically: +```python +# Trigger's tuple: (0, event_time) +# After enqueue: (0, event_time, queue_seq) +``` + +### Override at Enqueue + +```python +# Ignore trigger's priority, use override +await queue.enqueue( + trigger, + priority_override=(Priority.TIMER, scheduled_time) +) +``` + +## Why Tuples? + +✅ **Simple**: No custom classes, just native Python tuples +✅ **Flexible**: Add as many sort keys as needed +✅ **Efficient**: Python's tuple comparison is highly optimized +✅ **Readable**: `(0, 1000, 5)` is obvious what it means +✅ **Debuggable**: Can print and inspect easily + +Example: +```python +# Old: CompoundPriority(primary=0, secondary=1000, tertiary=5) +# New: (0, 1000, 5) + +# Same semantics, much simpler! +``` + +## Advanced: Custom Sorting + +Want to sort by multiple factors? Just add more elements: + +```python +# Sort by: priority → symbol → event_time → queue_seq +priority_tuple = ( + Priority.DATA_SOURCE.value, + symbol_id, # e.g., hash("BTC/USDT") + event_time, + # queue_seq appended by queue +) +``` + +## Summary + +- **Priorities are tuples**: `(primary, secondary, ..., queue_seq)` +- **Lower = higher priority**: Processed first +- **Element-by-element comparison**: Left-to-right +- **Shorter tuple wins**: If equal up to shorter length +- **Queue appends queue_seq**: Always last element (FIFO within same priority) + +That's it! No complex classes, just tuples. 🎯 diff --git a/backend/src/trigger/README.md b/backend/src/trigger/README.md new file mode 100644 index 0000000..e486b5c --- /dev/null +++ b/backend/src/trigger/README.md @@ -0,0 +1,386 @@ +# Trigger System + +Lock-free, sequence-based execution system for deterministic event processing. + +## Overview + +All operations (WebSocket messages, cron tasks, data updates) flow through a **priority queue**, execute in **parallel**, but commit in **strict sequential order** with **optimistic conflict detection**. + +### Key Features + +- **Lock-free reads**: Snapshots are deep copies, no blocking +- **Sequential commits**: Total ordering via sequence numbers +- **Optimistic concurrency**: Conflicts detected, retry with same seq +- **Priority preservation**: High-priority work never blocked by low-priority +- **Long-running agents**: Execute in parallel, commit sequentially +- **Deterministic replay**: Can reproduce exact system state at any seq + +## Architecture + +``` +┌─────────────┐ +│ WebSocket │───┐ +│ Messages │ │ +└─────────────┘ │ + ├──→ ┌─────────────────┐ +┌─────────────┐ │ │ TriggerQueue │ +│ Cron │───┤ │ (Priority Queue)│ +│ Scheduled │ │ └────────┬────────┘ +└─────────────┘ │ │ Assign seq + │ ↓ +┌─────────────┐ │ ┌─────────────────┐ +│ DataSource │───┘ │ Execute Trigger│ +│ Updates │ │ (Parallel OK) │ +└─────────────┘ └────────┬────────┘ + │ CommitIntents + ↓ + ┌─────────────────┐ + │ CommitCoordinator│ + │ (Sequential) │ + └────────┬────────┘ + │ Commit in seq order + ↓ + ┌─────────────────┐ + │ VersionedStores │ + │ (w/ Backends) │ + └─────────────────┘ +``` + +## Core Components + +### 1. ExecutionContext (`context.py`) + +Tracks execution seq and store snapshots via `contextvars` (auto-propagates through async calls). + +```python +from trigger import get_execution_context + +ctx = get_execution_context() +print(f"Running at seq {ctx.seq}") +``` + +### 2. Trigger Types (`types.py`) + +```python +from trigger import Trigger, Priority, CommitIntent + +class MyTrigger(Trigger): + async def execute(self) -> list[CommitIntent]: + # Read snapshot + seq, data = some_store.read_snapshot() + + # Modify + new_data = modify(data) + + # Prepare commit + intent = some_store.prepare_commit(seq, new_data) + return [intent] +``` + +### 3. VersionedStore (`store.py`) + +Stores with pluggable backends and optimistic concurrency: + +```python +from trigger import VersionedStore, PydanticStoreBackend + +# Wrap existing Pydantic model +backend = PydanticStoreBackend(order_store) +versioned_store = VersionedStore("OrderStore", backend) + +# Lock-free snapshot read +seq, snapshot = versioned_store.read_snapshot() + +# Prepare commit (does not modify yet) +intent = versioned_store.prepare_commit(seq, modified_snapshot) +``` + +**Pluggable Backends**: +- `PydanticStoreBackend`: For existing Pydantic models (OrderStore, ChartStore, etc.) +- `FileStoreBackend`: Future - version files (Python scripts, configs) +- `DatabaseStoreBackend`: Future - version database rows + +### 4. CommitCoordinator (`coordinator.py`) + +Manages sequential commits with conflict detection: + +- Waits for seq N to commit before N+1 +- Detects conflicts (expected_seq vs committed_seq) +- Re-executes (not re-enqueues) on conflict **with same seq** +- Tracks execution state for debugging + +### 5. TriggerQueue (`queue.py`) + +Priority queue with seq assignment: + +```python +from trigger import TriggerQueue + +queue = TriggerQueue(coordinator) +await queue.start() + +# Enqueue trigger +await queue.enqueue(my_trigger, Priority.HIGH) +``` + +### 6. TriggerScheduler (`scheduler.py`) + +APScheduler integration for cron triggers: + +```python +from trigger.scheduler import TriggerScheduler + +scheduler = TriggerScheduler(queue) +scheduler.start() + +# Every 5 minutes +scheduler.schedule_interval( + IndicatorUpdateTrigger("rsi_14"), + minutes=5 +) + +# Daily at 9 AM +scheduler.schedule_cron( + SyncExchangeStateTrigger(), + hour="9", + minute="0" +) +``` + +## Integration Example + +### Basic Setup in `main.py` + +```python +from trigger import ( + CommitCoordinator, + TriggerQueue, + VersionedStore, + PydanticStoreBackend, +) +from trigger.scheduler import TriggerScheduler + +# Create coordinator +coordinator = CommitCoordinator() + +# Wrap existing stores +order_store_versioned = VersionedStore( + "OrderStore", + PydanticStoreBackend(order_store) +) +coordinator.register_store(order_store_versioned) + +chart_store_versioned = VersionedStore( + "ChartStore", + PydanticStoreBackend(chart_store) +) +coordinator.register_store(chart_store_versioned) + +# Create queue and scheduler +trigger_queue = TriggerQueue(coordinator) +await trigger_queue.start() + +scheduler = TriggerScheduler(trigger_queue) +scheduler.start() +``` + +### WebSocket Message Handler + +```python +from trigger.handlers import AgentTriggerHandler + +@app.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + await websocket.accept() + + while True: + data = await websocket.receive_json() + + if data["type"] == "agent_user_message": + # Enqueue agent trigger instead of direct Gateway call + trigger = AgentTriggerHandler( + session_id=data["session_id"], + message_content=data["content"], + gateway_handler=gateway.route_user_message, + coordinator=coordinator, + ) + await trigger_queue.enqueue(trigger) +``` + +### DataSource Updates + +```python +from trigger.handlers import DataUpdateTrigger + +# In subscription_manager._on_source_update() +def _on_source_update(self, source_key: tuple, bar: dict): + # Enqueue data update trigger + trigger = DataUpdateTrigger( + source_name=source_key[0], + symbol=source_key[1], + resolution=source_key[2], + bar_data=bar, + coordinator=coordinator, + ) + asyncio.create_task(trigger_queue.enqueue(trigger)) +``` + +### Custom Trigger + +```python +from trigger import Trigger, CommitIntent, Priority + +class RecalculatePortfolioTrigger(Trigger): + def __init__(self, coordinator): + super().__init__("recalc_portfolio", Priority.NORMAL) + self.coordinator = coordinator + + async def execute(self) -> list[CommitIntent]: + # Read snapshots from multiple stores + order_seq, orders = self.coordinator.get_store("OrderStore").read_snapshot() + chart_seq, chart = self.coordinator.get_store("ChartStore").read_snapshot() + + # Calculate portfolio value + portfolio_value = calculate_portfolio(orders, chart) + + # Update chart state with portfolio value + chart.portfolio_value = portfolio_value + + # Prepare commit + intent = self.coordinator.get_store("ChartStore").prepare_commit( + chart_seq, + chart + ) + + return [intent] + +# Schedule it +scheduler.schedule_interval( + RecalculatePortfolioTrigger(coordinator), + minutes=1 +) +``` + +## Execution Flow + +### Normal Flow (No Conflicts) + +``` +seq=100: WebSocket message arrives → enqueue → dequeue → assign seq=100 → execute +seq=101: Cron trigger fires → enqueue → dequeue → assign seq=101 → execute + +seq=101 finishes first → waits in commit queue +seq=100 finishes → commits immediately (next in order) +seq=101 commits next +``` + +### Conflict Flow + +``` +seq=100: reads OrderStore at seq=99 → executes for 30 seconds +seq=101: reads OrderStore at seq=99 → executes for 5 seconds + +seq=101 finishes first → tries to commit based on seq=99 +seq=100 finishes → commits OrderStore at seq=100 + +Coordinator detects conflict: + expected_seq=99, committed_seq=100 + +seq=101 evicted → RE-EXECUTES with same seq=101 (not re-enqueued) + reads OrderStore at seq=100 → executes again + finishes → commits successfully at seq=101 +``` + +## Benefits + +### For Agent System + +- **Long-running agents work naturally**: Agent starts at seq=100, runs for 60 seconds while market data updates at seq=101-110, commits only if no conflicts +- **No deadlocks**: No locks = no deadlock possibility +- **Deterministic**: Can replay from any seq for debugging + +### For Strategy Execution + +- **High-frequency data doesn't block strategies**: Data updates enqueued, executed in parallel, commit sequentially +- **Priority preservation**: Critical order execution never blocked by indicator calculations +- **Conflict detection**: If market moved during strategy calculation, automatically retry with fresh data + +### For Scaling + +- **Single-node first**: Runs on single asyncio event loop, no complex distributed coordination +- **Future-proof**: Can swap queue for Redis/PostgreSQL-backed distributed queue later +- **Event sourcing ready**: All commits have seq numbers, can build event log + +## Debugging + +### Check Current State + +```python +# Coordinator stats +stats = coordinator.get_stats() +print(f"Current seq: {stats['current_seq']}") +print(f"Pending commits: {stats['pending_commits']}") +print(f"Executions by state: {stats['state_counts']}") + +# Store state +store = coordinator.get_store("OrderStore") +print(f"Store: {store}") # Shows committed_seq and version + +# Execution record +record = coordinator.get_execution_record(100) +print(f"Seq 100: {record}") # Shows state, retry_count, error +``` + +### Common Issues + +**Symptoms: High conflict rate** +- **Cause**: Multiple triggers modifying same store frequently +- **Solution**: Batch updates, use debouncing, or redesign to reduce contention + +**Symptoms: Commits stuck (next_commit_seq not advancing)** +- **Cause**: Execution at that seq failed or is taking too long +- **Solution**: Check execution_records for that seq, look for errors in logs + +**Symptoms: Queue depth growing** +- **Cause**: Executions slower than enqueue rate +- **Solution**: Profile trigger execution, optimize slow paths, add rate limiting + +## Testing + +### Unit Test: Conflict Detection + +```python +import pytest +from trigger import VersionedStore, PydanticStoreBackend, CommitCoordinator + +@pytest.mark.asyncio +async def test_conflict_detection(): + coordinator = CommitCoordinator() + + store = VersionedStore("TestStore", PydanticStoreBackend(TestModel())) + coordinator.register_store(store) + + # Seq 1: read at 0, modify, commit + seq1, data1 = store.read_snapshot() + data1.value = "seq1" + intent1 = store.prepare_commit(seq1, data1) + + # Seq 2: read at 0 (same snapshot), modify + seq2, data2 = store.read_snapshot() + data2.value = "seq2" + intent2 = store.prepare_commit(seq2, data2) + + # Commit seq 1 (should succeed) + # ... coordinator logic ... + + # Commit seq 2 (should conflict and retry) + # ... verify conflict detected ... +``` + +## Future Enhancements + +- **Distributed queue**: Redis-backed queue for multi-worker deployment +- **Event log persistence**: Store all commits for event sourcing/audit +- **Metrics dashboard**: Real-time view of queue depth, conflict rate, latency +- **Transaction snapshots**: Full system state at any seq for replay/debugging +- **Automatic batching**: Coalesce rapid updates to same store diff --git a/backend/src/trigger/__init__.py b/backend/src/trigger/__init__.py new file mode 100644 index 0000000..7209817 --- /dev/null +++ b/backend/src/trigger/__init__.py @@ -0,0 +1,35 @@ +""" +Sequential execution trigger system with optimistic concurrency control. + +All operations (websocket, cron, data events) flow through a priority queue, +execute in parallel, but commit in strict sequential order with conflict detection. +""" + +from .context import ExecutionContext, get_execution_context +from .types import Priority, PriorityTuple, Trigger, CommitIntent, ExecutionState +from .store import VersionedStore, StoreBackend, PydanticStoreBackend +from .coordinator import CommitCoordinator +from .queue import TriggerQueue +from .handlers import AgentTriggerHandler, LambdaHandler + +__all__ = [ + # Context + "ExecutionContext", + "get_execution_context", + # Types + "Priority", + "PriorityTuple", + "Trigger", + "CommitIntent", + "ExecutionState", + # Store + "VersionedStore", + "StoreBackend", + "PydanticStoreBackend", + # Coordination + "CommitCoordinator", + "TriggerQueue", + # Handlers + "AgentTriggerHandler", + "LambdaHandler", +] diff --git a/backend/src/trigger/context.py b/backend/src/trigger/context.py new file mode 100644 index 0000000..e72c8f3 --- /dev/null +++ b/backend/src/trigger/context.py @@ -0,0 +1,61 @@ +""" +Execution context tracking using Python's contextvars. + +Each execution gets a unique seq number that propagates through all async calls, +allowing us to track which execution made which changes for conflict detection. +""" + +import logging +from contextvars import ContextVar +from dataclasses import dataclass, field +from typing import Optional + +logger = logging.getLogger(__name__) + +# Context variables - automatically propagate through async call chains +_execution_context: ContextVar[Optional["ExecutionContext"]] = ContextVar( + "execution_context", default=None +) + + +@dataclass +class ExecutionContext: + """ + Execution context for a single trigger execution. + + Automatically propagates through async calls via contextvars. + Tracks the seq number and which store snapshots were read. + """ + + seq: int + """Sequential execution number - determines commit order""" + + trigger_name: str + """Name/type of trigger being executed""" + + snapshot_seqs: dict[str, int] = field(default_factory=dict) + """Store name -> seq number of snapshot that was read""" + + def record_snapshot(self, store_name: str, snapshot_seq: int) -> None: + """Record that we read a snapshot from a store at a specific seq""" + self.snapshot_seqs[store_name] = snapshot_seq + logger.debug(f"Seq {self.seq}: Read {store_name} at seq {snapshot_seq}") + + def __str__(self) -> str: + return f"ExecutionContext(seq={self.seq}, trigger={self.trigger_name})" + + +def get_execution_context() -> Optional[ExecutionContext]: + """Get the current execution context, or None if not in an execution""" + return _execution_context.get() + + +def set_execution_context(ctx: ExecutionContext) -> None: + """Set the execution context for the current async task""" + _execution_context.set(ctx) + logger.debug(f"Set execution context: {ctx}") + + +def clear_execution_context() -> None: + """Clear the execution context""" + _execution_context.set(None) diff --git a/backend/src/trigger/coordinator.py b/backend/src/trigger/coordinator.py new file mode 100644 index 0000000..3f5ab58 --- /dev/null +++ b/backend/src/trigger/coordinator.py @@ -0,0 +1,302 @@ +""" +Commit coordinator - manages sequential commits with conflict detection. + +Ensures that commits happen in strict sequence order, even when executions +complete out of order. Detects conflicts and triggers re-execution with the +same seq number (not re-enqueue, just re-execute). +""" + +import asyncio +import logging +from typing import Optional + +from .context import ExecutionContext +from .store import VersionedStore +from .types import CommitIntent, ExecutionRecord, ExecutionState, Trigger + +logger = logging.getLogger(__name__) + + +class CommitCoordinator: + """ + Manages sequential commits with optimistic concurrency control. + + Key responsibilities: + - Maintain strict sequential commit order (seq N+1 commits after seq N) + - Detect conflicts between execution snapshot and committed state + - Trigger re-execution (not re-enqueue) on conflicts with same seq + - Track in-flight executions for debugging and monitoring + """ + + def __init__(self): + self._stores: dict[str, VersionedStore] = {} + self._current_seq = 0 # Highest committed seq across all operations + self._next_commit_seq = 1 # Next seq we're waiting to commit + self._pending_commits: dict[int, tuple[ExecutionRecord, list[CommitIntent]]] = {} + self._execution_records: dict[int, ExecutionRecord] = {} + self._lock = asyncio.Lock() # Only for coordinator internal state, not stores + + def register_store(self, store: VersionedStore) -> None: + """Register a versioned store with the coordinator""" + self._stores[store.name] = store + logger.info(f"Registered store: {store.name}") + + def get_store(self, name: str) -> Optional[VersionedStore]: + """Get a registered store by name""" + return self._stores.get(name) + + async def start_execution(self, seq: int, trigger: Trigger) -> ExecutionRecord: + """ + Record that an execution is starting. + + Args: + seq: Sequence number assigned to this execution + trigger: The trigger being executed + + Returns: + ExecutionRecord for tracking + """ + async with self._lock: + record = ExecutionRecord( + seq=seq, + trigger=trigger, + state=ExecutionState.EXECUTING, + ) + self._execution_records[seq] = record + logger.info(f"Started execution: seq={seq}, trigger={trigger.name}") + return record + + async def submit_for_commit( + self, + seq: int, + commit_intents: list[CommitIntent], + ) -> None: + """ + Submit commit intents for sequential commit. + + The commit will only happen when: + 1. All prior seq numbers have committed + 2. No conflicts detected with committed state + + Args: + seq: Sequence number of this execution + commit_intents: List of changes to commit (empty if no changes) + """ + async with self._lock: + record = self._execution_records.get(seq) + if not record: + logger.error(f"No execution record found for seq={seq}") + return + + record.state = ExecutionState.WAITING_COMMIT + record.commit_intents = commit_intents + self._pending_commits[seq] = (record, commit_intents) + + logger.info( + f"Seq {seq} submitted for commit with {len(commit_intents)} intents" + ) + + # Try to process commits (this will handle sequential ordering) + await self._process_commits() + + async def _process_commits(self) -> None: + """ + Process pending commits in strict sequential order. + + Only commits seq N if seq N-1 has already committed. + Detects conflicts and triggers re-execution with same seq. + """ + while True: + async with self._lock: + # Check if next expected seq is ready to commit + if self._next_commit_seq not in self._pending_commits: + # Waiting for this seq to complete execution + break + + seq = self._next_commit_seq + record, intents = self._pending_commits[seq] + + logger.info( + f"Processing commit for seq={seq} (current_seq={self._current_seq})" + ) + + # Check for conflicts + conflicts = self._check_conflicts(intents) + + if conflicts: + # Conflict detected - re-execute with same seq + logger.warning( + f"Seq {seq} has conflicts in stores: {conflicts}. Re-executing..." + ) + + # Remove from pending (will be re-added when execution completes) + del self._pending_commits[seq] + + # Mark as evicted + record.state = ExecutionState.EVICTED + record.retry_count += 1 + + # Advance to next seq (this seq will be retried in background) + self._next_commit_seq += 1 + self._current_seq += 1 + + # Trigger re-execution (outside lock) + asyncio.create_task(self._retry_execution(record)) + + continue + + # No conflicts - commit all intents atomically + for intent in intents: + store = self._stores.get(intent.store_name) + if not store: + logger.error( + f"Seq {seq}: Store '{intent.store_name}' not found" + ) + continue + + store.commit(intent.new_data, seq) + + # Mark as committed + record.state = ExecutionState.COMMITTED + del self._pending_commits[seq] + + # Advance seq counters + self._current_seq = seq + self._next_commit_seq = seq + 1 + + logger.info( + f"Committed seq={seq}, current_seq now {self._current_seq}" + ) + + def _check_conflicts(self, intents: list[CommitIntent]) -> list[str]: + """ + Check if any commit intents conflict with current committed state. + + Args: + intents: List of commit intents to check + + Returns: + List of store names that have conflicts (empty if no conflicts) + """ + conflicts = [] + + for intent in intents: + store = self._stores.get(intent.store_name) + if not store: + logger.error(f"Store '{intent.store_name}' not found during conflict check") + continue + + if store.check_conflict(intent.expected_seq): + conflicts.append(intent.store_name) + + return conflicts + + async def _retry_execution(self, record: ExecutionRecord) -> None: + """ + Re-execute a trigger that had conflicts. + + Executes with the SAME seq number (not re-enqueued, just re-executed). + This ensures the execution order remains deterministic. + + Args: + record: Execution record to retry + """ + from .context import ExecutionContext, set_execution_context, clear_execution_context + + logger.info( + f"Retrying execution: seq={record.seq}, trigger={record.trigger.name}, " + f"retry_count={record.retry_count}" + ) + + # Set execution context for retry + ctx = ExecutionContext( + seq=record.seq, + trigger_name=record.trigger.name, + ) + set_execution_context(ctx) + + try: + # Re-execute trigger + record.state = ExecutionState.EXECUTING + commit_intents = await record.trigger.execute() + + # Submit for commit again (with same seq) + await self.submit_for_commit(record.seq, commit_intents) + + except Exception as e: + logger.error( + f"Retry execution failed for seq={record.seq}: {e}", exc_info=True + ) + record.state = ExecutionState.FAILED + record.error = str(e) + + # Still need to advance past this seq + async with self._lock: + if record.seq == self._next_commit_seq: + self._next_commit_seq += 1 + self._current_seq += 1 + + # Try to process any pending commits + await self._process_commits() + + finally: + clear_execution_context() + + async def execution_failed(self, seq: int, error: Exception) -> None: + """ + Mark an execution as failed. + + Args: + seq: Sequence number that failed + error: The exception that caused the failure + """ + async with self._lock: + record = self._execution_records.get(seq) + if record: + record.state = ExecutionState.FAILED + record.error = str(error) + + # Remove from pending if present + self._pending_commits.pop(seq, None) + + # If this is the next seq to commit, advance past it + if seq == self._next_commit_seq: + self._next_commit_seq += 1 + self._current_seq += 1 + + logger.info( + f"Seq {seq} failed, advancing current_seq to {self._current_seq}" + ) + + # Try to process any pending commits + await self._process_commits() + + def get_current_seq(self) -> int: + """Get the current committed sequence number""" + return self._current_seq + + def get_execution_record(self, seq: int) -> Optional[ExecutionRecord]: + """Get execution record for a specific seq""" + return self._execution_records.get(seq) + + def get_stats(self) -> dict: + """Get statistics about the coordinator state""" + state_counts = {} + for record in self._execution_records.values(): + state_name = record.state.name + state_counts[state_name] = state_counts.get(state_name, 0) + 1 + + return { + "current_seq": self._current_seq, + "next_commit_seq": self._next_commit_seq, + "pending_commits": len(self._pending_commits), + "total_executions": len(self._execution_records), + "state_counts": state_counts, + "stores": {name: str(store) for name, store in self._stores.items()}, + } + + def __repr__(self) -> str: + return ( + f"CommitCoordinator(current_seq={self._current_seq}, " + f"pending={len(self._pending_commits)}, stores={len(self._stores)})" + ) diff --git a/backend/src/trigger/handlers.py b/backend/src/trigger/handlers.py new file mode 100644 index 0000000..22d5a11 --- /dev/null +++ b/backend/src/trigger/handlers.py @@ -0,0 +1,304 @@ +""" +Trigger handlers - concrete implementations for common trigger types. + +Provides ready-to-use trigger handlers for: +- Agent execution (WebSocket user messages) +- Lambda/callable execution +- Data update triggers +- Indicator updates +""" + +import logging +import time +from typing import Any, Awaitable, Callable, Optional + +from .coordinator import CommitCoordinator +from .types import CommitIntent, Priority, Trigger + +logger = logging.getLogger(__name__) + + +class AgentTriggerHandler(Trigger): + """ + Trigger for agent execution from WebSocket user messages. + + Wraps the Gateway's agent execution flow and captures any + store modifications as commit intents. + + Priority tuple: (USER_AGENT, message_timestamp, queue_seq) + """ + + def __init__( + self, + session_id: str, + message_content: str, + message_timestamp: Optional[int] = None, + attachments: Optional[list] = None, + gateway_handler: Optional[Callable] = None, + coordinator: Optional[CommitCoordinator] = None, + ): + """ + Initialize agent trigger. + + Args: + session_id: User session ID + message_content: User message content + message_timestamp: When user sent message (unix timestamp, defaults to now) + attachments: Optional message attachments + gateway_handler: Callable to route to Gateway (set during integration) + coordinator: CommitCoordinator for accessing stores + """ + if message_timestamp is None: + message_timestamp = int(time.time()) + + # Priority tuple: sort by USER_AGENT priority, then message timestamp + super().__init__( + name=f"agent_{session_id}", + priority=Priority.USER_AGENT, + priority_tuple=(Priority.USER_AGENT.value, message_timestamp) + ) + self.session_id = session_id + self.message_content = message_content + self.message_timestamp = message_timestamp + self.attachments = attachments or [] + self.gateway_handler = gateway_handler + self.coordinator = coordinator + + async def execute(self) -> list[CommitIntent]: + """ + Execute agent interaction. + + This will call into the Gateway, which will run the agent. + The agent may read from stores and generate responses. + Any store modifications are captured as commit intents. + + Returns: + List of commit intents (typically empty for now, as agent + modifies stores via tools which will be integrated later) + """ + if not self.gateway_handler: + logger.error("No gateway_handler configured for AgentTriggerHandler") + return [] + + logger.info( + f"Agent trigger executing: session={self.session_id}, " + f"content='{self.message_content[:50]}...'" + ) + + try: + # Call Gateway to handle message + # In future, Gateway/agent tools will use coordinator stores + await self.gateway_handler( + self.session_id, + self.message_content, + self.attachments, + ) + + # For now, agent doesn't directly modify stores + # Future: agent tools will return commit intents + return [] + + except Exception as e: + logger.error(f"Agent execution error: {e}", exc_info=True) + raise + + +class LambdaHandler(Trigger): + """ + Generic trigger that executes an arbitrary async callable. + + Useful for custom triggers, one-off tasks, or testing. + """ + + def __init__( + self, + name: str, + func: Callable[[], Awaitable[list[CommitIntent]]], + priority: Priority = Priority.SYSTEM, + ): + """ + Initialize lambda handler. + + Args: + name: Descriptive name for this trigger + func: Async callable that returns commit intents + priority: Execution priority + """ + super().__init__(name, priority) + self.func = func + + async def execute(self) -> list[CommitIntent]: + """Execute the callable""" + logger.info(f"Lambda trigger executing: {self.name}") + return await self.func() + + +class DataUpdateTrigger(Trigger): + """ + Trigger for DataSource bar updates. + + Fired when new market data arrives. Can update indicators, + trigger strategy logic, or notify the agent of market events. + + Priority tuple: (DATA_SOURCE, event_time, queue_seq) + Ensures older bars process before newer ones. + """ + + def __init__( + self, + source_name: str, + symbol: str, + resolution: str, + bar_data: dict, + coordinator: Optional[CommitCoordinator] = None, + ): + """ + Initialize data update trigger. + + Args: + source_name: Name of data source (e.g., "binance") + symbol: Trading pair symbol + resolution: Time resolution + bar_data: Bar data dict (time, open, high, low, close, volume) + coordinator: CommitCoordinator for accessing stores + """ + event_time = bar_data.get('time', int(time.time())) + + # Priority tuple: sort by DATA_SOURCE priority, then event time + super().__init__( + name=f"data_{source_name}_{symbol}_{resolution}", + priority=Priority.DATA_SOURCE, + priority_tuple=(Priority.DATA_SOURCE.value, event_time) + ) + self.source_name = source_name + self.symbol = symbol + self.resolution = resolution + self.bar_data = bar_data + self.coordinator = coordinator + + async def execute(self) -> list[CommitIntent]: + """ + Process bar update. + + Future implementations will: + - Update indicator values + - Check strategy conditions + - Trigger alerts/notifications + + Returns: + Commit intents for any store updates + """ + logger.info( + f"Data update trigger: {self.source_name}:{self.symbol}@{self.resolution}, " + f"time={self.bar_data.get('time')}" + ) + + # TODO: Update indicators + # TODO: Check strategy conditions + # TODO: Notify agent of significant events + + # For now, just log + return [] + + +class IndicatorUpdateTrigger(Trigger): + """ + Trigger for updating indicator values. + + Can be fired by cron (periodic recalculation) or by data updates. + """ + + def __init__( + self, + indicator_id: str, + force_full_recalc: bool = False, + coordinator: Optional[CommitCoordinator] = None, + priority: Priority = Priority.SYSTEM, + ): + """ + Initialize indicator update trigger. + + Args: + indicator_id: ID of indicator to update + force_full_recalc: If True, recalculate entire history + coordinator: CommitCoordinator for accessing stores + priority: Execution priority + """ + super().__init__(f"indicator_{indicator_id}", priority) + self.indicator_id = indicator_id + self.force_full_recalc = force_full_recalc + self.coordinator = coordinator + + async def execute(self) -> list[CommitIntent]: + """ + Update indicator value. + + Reads from IndicatorStore, recalculates, prepares commit. + + Returns: + Commit intents for updated indicator data + """ + if not self.coordinator: + logger.error("No coordinator configured") + return [] + + # Get indicator store + indicator_store = self.coordinator.get_store("IndicatorStore") + if not indicator_store: + logger.error("IndicatorStore not registered") + return [] + + # Read snapshot + snapshot_seq, indicator_data = indicator_store.read_snapshot() + + logger.info( + f"Indicator update trigger: {self.indicator_id}, " + f"snapshot_seq={snapshot_seq}, force_full={self.force_full_recalc}" + ) + + # TODO: Implement indicator recalculation logic + # For now, just return empty (no changes) + + return [] + + +class CronTrigger(Trigger): + """ + Trigger fired by APScheduler on a schedule. + + Wraps another trigger or callable to execute periodically. + + Priority tuple: (TIMER, scheduled_time, queue_seq) + Ensures jobs scheduled for earlier times run first. + """ + + def __init__( + self, + name: str, + inner_trigger: Trigger, + scheduled_time: Optional[int] = None, + ): + """ + Initialize cron trigger. + + Args: + name: Descriptive name (e.g., "hourly_sync") + inner_trigger: Trigger to execute on schedule + scheduled_time: When this was scheduled to run (defaults to now) + """ + if scheduled_time is None: + scheduled_time = int(time.time()) + + # Priority tuple: sort by TIMER priority, then scheduled time + super().__init__( + name=f"cron_{name}", + priority=Priority.TIMER, + priority_tuple=(Priority.TIMER.value, scheduled_time) + ) + self.inner_trigger = inner_trigger + self.scheduled_time = scheduled_time + + async def execute(self) -> list[CommitIntent]: + """Execute the wrapped trigger""" + logger.info(f"Cron trigger firing: {self.name}") + return await self.inner_trigger.execute() diff --git a/backend/src/trigger/queue.py b/backend/src/trigger/queue.py new file mode 100644 index 0000000..dcd0608 --- /dev/null +++ b/backend/src/trigger/queue.py @@ -0,0 +1,224 @@ +""" +Trigger queue - priority queue with sequence number assignment. + +All operations flow through this queue: +- WebSocket messages from users +- Cron scheduled tasks +- DataSource bar updates +- Manual triggers + +Queue assigns seq numbers on dequeue, executes triggers, and submits to coordinator. +""" + +import asyncio +import logging +from typing import Optional + +from .context import ExecutionContext, clear_execution_context, set_execution_context +from .coordinator import CommitCoordinator +from .types import Priority, PriorityTuple, Trigger + +logger = logging.getLogger(__name__) + + +class TriggerQueue: + """ + Priority queue for trigger execution. + + Key responsibilities: + - Maintain priority queue (high priority dequeued first) + - Assign sequence numbers on dequeue (determines commit order) + - Execute triggers with context set + - Submit results to CommitCoordinator + - Handle execution errors gracefully + """ + + def __init__(self, coordinator: CommitCoordinator): + """ + Initialize trigger queue. + + Args: + coordinator: CommitCoordinator for handling commits + """ + self._coordinator = coordinator + self._queue: asyncio.PriorityQueue = asyncio.PriorityQueue() + self._seq_counter = 0 + self._seq_lock = asyncio.Lock() + self._processor_task: Optional[asyncio.Task] = None + self._running = False + + async def start(self) -> None: + """Start the queue processor""" + if self._running: + logger.warning("TriggerQueue already running") + return + + self._running = True + self._processor_task = asyncio.create_task(self._process_loop()) + logger.info("TriggerQueue started") + + async def stop(self) -> None: + """Stop the queue processor gracefully""" + if not self._running: + return + + self._running = False + + if self._processor_task: + self._processor_task.cancel() + try: + await self._processor_task + except asyncio.CancelledError: + pass + + logger.info("TriggerQueue stopped") + + async def enqueue( + self, + trigger: Trigger, + priority_override: Optional[Priority | PriorityTuple] = None + ) -> int: + """ + Add a trigger to the queue. + + Args: + trigger: Trigger to execute + priority_override: Override priority (simple Priority or tuple) + If None, uses trigger's priority/priority_tuple + If Priority enum, creates single-element tuple + If tuple, uses as-is + + Returns: + Queue sequence number (appended to priority tuple) + + Examples: + # Simple priority + await queue.enqueue(trigger, Priority.USER_AGENT) + # Results in: (Priority.USER_AGENT, queue_seq) + + # Tuple priority with event time + await queue.enqueue( + trigger, + (Priority.DATA_SOURCE, bar_data['time']) + ) + # Results in: (Priority.DATA_SOURCE, bar_time, queue_seq) + + # Let trigger decide + await queue.enqueue(trigger) + """ + # Get monotonic seq for queue ordering (appended to tuple) + async with self._seq_lock: + queue_seq = self._seq_counter + self._seq_counter += 1 + + # Determine priority tuple + if priority_override is not None: + if isinstance(priority_override, Priority): + # Convert simple priority to tuple + priority_tuple = (priority_override.value, queue_seq) + else: + # Use provided tuple, append queue_seq + priority_tuple = priority_override + (queue_seq,) + else: + # Let trigger determine its own priority tuple + priority_tuple = trigger.get_priority_tuple(queue_seq) + + # Priority queue: (priority_tuple, trigger) + # Python's PriorityQueue compares tuples element-by-element + await self._queue.put((priority_tuple, trigger)) + + logger.debug( + f"Enqueued: {trigger.name} with priority_tuple={priority_tuple}" + ) + + return queue_seq + + async def _process_loop(self) -> None: + """ + Main processing loop. + + Dequeues triggers, assigns execution seq, executes, and submits to coordinator. + """ + execution_seq = 0 # Separate counter for execution sequence + + while self._running: + try: + # Wait for next trigger (with timeout to check _running flag) + try: + priority_tuple, trigger = await asyncio.wait_for( + self._queue.get(), timeout=1.0 + ) + except asyncio.TimeoutError: + continue + + # Assign execution sequence number + execution_seq += 1 + + logger.info( + f"Dequeued: seq={execution_seq}, trigger={trigger.name}, " + f"priority_tuple={priority_tuple}" + ) + + # Execute in background (don't block queue) + asyncio.create_task( + self._execute_trigger(execution_seq, trigger) + ) + + except Exception as e: + logger.error(f"Error in process loop: {e}", exc_info=True) + + async def _execute_trigger(self, seq: int, trigger: Trigger) -> None: + """ + Execute a trigger with proper context and error handling. + + Args: + seq: Execution sequence number + trigger: Trigger to execute + """ + # Set up execution context + ctx = ExecutionContext( + seq=seq, + trigger_name=trigger.name, + ) + set_execution_context(ctx) + + # Record execution start with coordinator + await self._coordinator.start_execution(seq, trigger) + + try: + logger.info(f"Executing: seq={seq}, trigger={trigger.name}") + + # Execute trigger (can be long-running) + commit_intents = await trigger.execute() + + logger.info( + f"Execution complete: seq={seq}, {len(commit_intents)} commit intents" + ) + + # Submit for sequential commit + await self._coordinator.submit_for_commit(seq, commit_intents) + + except Exception as e: + logger.error( + f"Execution failed: seq={seq}, trigger={trigger.name}, error={e}", + exc_info=True, + ) + + # Notify coordinator of failure + await self._coordinator.execution_failed(seq, e) + + finally: + clear_execution_context() + + def get_queue_size(self) -> int: + """Get current queue size (approximate)""" + return self._queue.qsize() + + def is_running(self) -> bool: + """Check if queue processor is running""" + return self._running + + def __repr__(self) -> str: + return ( + f"TriggerQueue(running={self._running}, queue_size={self.get_queue_size()})" + ) diff --git a/backend/src/trigger/scheduler.py b/backend/src/trigger/scheduler.py new file mode 100644 index 0000000..afc795e --- /dev/null +++ b/backend/src/trigger/scheduler.py @@ -0,0 +1,187 @@ +""" +APScheduler integration for cron-style triggers. + +Provides scheduling of periodic triggers (e.g., sync exchange state hourly, +recompute indicators every 5 minutes, daily portfolio reports). +""" + +import logging +from typing import Optional + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger as APSCronTrigger +from apscheduler.triggers.interval import IntervalTrigger + +from .queue import TriggerQueue +from .types import Priority, Trigger + +logger = logging.getLogger(__name__) + + +class TriggerScheduler: + """ + Scheduler for periodic trigger execution. + + Wraps APScheduler to enqueue triggers at scheduled times. + """ + + def __init__(self, trigger_queue: TriggerQueue): + """ + Initialize scheduler. + + Args: + trigger_queue: TriggerQueue to enqueue triggers into + """ + self.trigger_queue = trigger_queue + self.scheduler = AsyncIOScheduler() + self._job_counter = 0 + + def start(self) -> None: + """Start the scheduler""" + self.scheduler.start() + logger.info("TriggerScheduler started") + + def shutdown(self, wait: bool = True) -> None: + """ + Shut down the scheduler. + + Args: + wait: If True, wait for running jobs to complete + """ + self.scheduler.shutdown(wait=wait) + logger.info("TriggerScheduler shut down") + + def schedule_interval( + self, + trigger: Trigger, + seconds: Optional[int] = None, + minutes: Optional[int] = None, + hours: Optional[int] = None, + priority: Optional[Priority] = None, + ) -> str: + """ + Schedule a trigger to run at regular intervals. + + Args: + trigger: Trigger to execute + seconds: Interval in seconds + minutes: Interval in minutes + hours: Interval in hours + priority: Priority override for execution + + Returns: + Job ID (can be used to remove job later) + + Example: + # Run every 5 minutes + scheduler.schedule_interval( + IndicatorUpdateTrigger("rsi_14"), + minutes=5 + ) + """ + job_id = f"interval_{self._job_counter}" + self._job_counter += 1 + + async def job_func(): + await self.trigger_queue.enqueue(trigger, priority) + + self.scheduler.add_job( + job_func, + trigger=IntervalTrigger(seconds=seconds, minutes=minutes, hours=hours), + id=job_id, + name=f"Interval: {trigger.name}", + ) + + logger.info( + f"Scheduled interval job: {job_id}, trigger={trigger.name}, " + f"interval=(s={seconds}, m={minutes}, h={hours})" + ) + + return job_id + + def schedule_cron( + self, + trigger: Trigger, + minute: Optional[str] = None, + hour: Optional[str] = None, + day: Optional[str] = None, + month: Optional[str] = None, + day_of_week: Optional[str] = None, + priority: Optional[Priority] = None, + ) -> str: + """ + Schedule a trigger to run on a cron schedule. + + Args: + trigger: Trigger to execute + minute: Minute expression (0-59, *, */5, etc.) + hour: Hour expression (0-23, *, etc.) + day: Day of month expression (1-31, *, etc.) + month: Month expression (1-12, *, etc.) + day_of_week: Day of week expression (0-6, mon-sun, *, etc.) + priority: Priority override for execution + + Returns: + Job ID (can be used to remove job later) + + Example: + # Run at 9:00 AM every weekday + scheduler.schedule_cron( + SyncExchangeStateTrigger(), + hour="9", + minute="0", + day_of_week="mon-fri" + ) + """ + job_id = f"cron_{self._job_counter}" + self._job_counter += 1 + + async def job_func(): + await self.trigger_queue.enqueue(trigger, priority) + + self.scheduler.add_job( + job_func, + trigger=APSCronTrigger( + minute=minute, + hour=hour, + day=day, + month=month, + day_of_week=day_of_week, + ), + id=job_id, + name=f"Cron: {trigger.name}", + ) + + logger.info( + f"Scheduled cron job: {job_id}, trigger={trigger.name}, " + f"schedule=(m={minute}, h={hour}, d={day}, dow={day_of_week})" + ) + + return job_id + + def remove_job(self, job_id: str) -> bool: + """ + Remove a scheduled job. + + Args: + job_id: Job ID returned from schedule_* methods + + Returns: + True if job was removed, False if not found + """ + try: + self.scheduler.remove_job(job_id) + logger.info(f"Removed scheduled job: {job_id}") + return True + except Exception as e: + logger.warning(f"Could not remove job {job_id}: {e}") + return False + + def get_jobs(self) -> list: + """Get list of all scheduled jobs""" + return self.scheduler.get_jobs() + + def __repr__(self) -> str: + job_count = len(self.scheduler.get_jobs()) + running = self.scheduler.running + return f"TriggerScheduler(running={running}, jobs={job_count})" diff --git a/backend/src/trigger/store.py b/backend/src/trigger/store.py new file mode 100644 index 0000000..7dfa4f3 --- /dev/null +++ b/backend/src/trigger/store.py @@ -0,0 +1,301 @@ +""" +Versioned store with pluggable backends. + +Provides optimistic concurrency control via sequence numbers with support +for different storage backends (Pydantic models, files, databases, etc.). +""" + +import logging +from abc import ABC, abstractmethod +from copy import deepcopy +from typing import Any, Generic, TypeVar + +from .context import get_execution_context +from .types import CommitIntent + +logger = logging.getLogger(__name__) + +T = TypeVar("T") + + +class StoreBackend(ABC, Generic[T]): + """ + Abstract backend for versioned stores. + + Allows different storage mechanisms (Pydantic models, files, databases) + to be used with the same versioned store infrastructure. + """ + + @abstractmethod + def read(self) -> T: + """ + Read the current data. + + Returns: + Current data in backend-specific format + """ + pass + + @abstractmethod + def write(self, data: T) -> None: + """ + Write new data (replaces existing). + + Args: + data: New data to write + """ + pass + + @abstractmethod + def snapshot(self) -> T: + """ + Create an immutable snapshot of current data. + + Must return a deep copy or immutable version to prevent + modifications from affecting the committed state. + + Returns: + Immutable snapshot of data + """ + pass + + @abstractmethod + def validate(self, data: T) -> bool: + """ + Validate that data is in correct format for this backend. + + Args: + data: Data to validate + + Returns: + True if valid + + Raises: + ValueError: If invalid with explanation + """ + pass + + +class PydanticStoreBackend(StoreBackend[T]): + """ + Backend for Pydantic BaseModel stores. + + Supports the existing OrderStore, ChartStore, etc. pattern. + """ + + def __init__(self, model_instance: T): + """ + Initialize with a Pydantic model instance. + + Args: + model_instance: Instance of a Pydantic BaseModel + """ + self._model = model_instance + + def read(self) -> T: + return self._model + + def write(self, data: T) -> None: + # Replace the internal model + self._model = data + + def snapshot(self) -> T: + # Use Pydantic's model_copy for deep copy + if hasattr(self._model, "model_copy"): + return self._model.model_copy(deep=True) + # Fallback for older Pydantic or non-model types + return deepcopy(self._model) + + def validate(self, data: T) -> bool: + # Pydantic models validate themselves on construction + # If we got here with a model instance, it's valid + return True + + +class FileStoreBackend(StoreBackend[str]): + """ + Backend for file-based storage. + + Future implementation for versioning files (e.g., Python scripts, configs). + """ + + def __init__(self, file_path: str): + self.file_path = file_path + raise NotImplementedError("FileStoreBackend not yet implemented") + + def read(self) -> str: + raise NotImplementedError() + + def write(self, data: str) -> None: + raise NotImplementedError() + + def snapshot(self) -> str: + raise NotImplementedError() + + def validate(self, data: str) -> bool: + raise NotImplementedError() + + +class DatabaseStoreBackend(StoreBackend[dict]): + """ + Backend for database table storage. + + Future implementation for versioning database interactions. + """ + + def __init__(self, table_name: str, connection): + self.table_name = table_name + self.connection = connection + raise NotImplementedError("DatabaseStoreBackend not yet implemented") + + def read(self) -> dict: + raise NotImplementedError() + + def write(self, data: dict) -> None: + raise NotImplementedError() + + def snapshot(self) -> dict: + raise NotImplementedError() + + def validate(self, data: dict) -> bool: + raise NotImplementedError() + + +class VersionedStore(Generic[T]): + """ + Store with optimistic concurrency control via sequence numbers. + + Wraps any StoreBackend and provides: + - Lock-free snapshot reads + - Conflict detection on commit + - Version tracking for debugging + """ + + def __init__(self, name: str, backend: StoreBackend[T]): + """ + Initialize versioned store. + + Args: + name: Unique name for this store (e.g., "OrderStore") + backend: Backend implementation for storage + """ + self.name = name + self._backend = backend + self._committed_seq = 0 # Highest committed seq + self._version = 0 # Increments on each commit (for debugging) + + @property + def committed_seq(self) -> int: + """Get the current committed sequence number""" + return self._committed_seq + + @property + def version(self) -> int: + """Get the current version (increments on each commit)""" + return self._version + + def read_snapshot(self) -> tuple[int, T]: + """ + Read an immutable snapshot of the store. + + This is lock-free and can be called concurrently. The snapshot + captures the current committed seq and a deep copy of the data. + + Automatically records the snapshot seq in the execution context + for conflict detection during commit. + + Returns: + Tuple of (seq, snapshot_data) + """ + snapshot_seq = self._committed_seq + snapshot_data = self._backend.snapshot() + + # Record in execution context for conflict detection + ctx = get_execution_context() + if ctx: + ctx.record_snapshot(self.name, snapshot_seq) + + logger.debug( + f"Store '{self.name}': read_snapshot() -> seq={snapshot_seq}, version={self._version}" + ) + + return (snapshot_seq, snapshot_data) + + def read_current(self) -> T: + """ + Read the current data without snapshot tracking. + + Use this for read-only operations that don't need conflict detection. + + Returns: + Current data (not a snapshot, modifications visible) + """ + return self._backend.read() + + def prepare_commit(self, expected_seq: int, new_data: T) -> CommitIntent: + """ + Create a commit intent for later sequential commit. + + Does NOT modify the store - that happens during the commit phase. + + Args: + expected_seq: The seq of the snapshot that was read + new_data: The new data to commit + + Returns: + CommitIntent to be submitted to CommitCoordinator + """ + # Validate data before creating intent + self._backend.validate(new_data) + + intent = CommitIntent( + store_name=self.name, + expected_seq=expected_seq, + new_data=new_data, + ) + + logger.debug( + f"Store '{self.name}': prepare_commit(expected_seq={expected_seq}, current_seq={self._committed_seq})" + ) + + return intent + + def commit(self, new_data: T, commit_seq: int) -> None: + """ + Commit new data at a specific seq. + + Called by CommitCoordinator during sequential commit phase. + NOT for direct use by triggers. + + Args: + new_data: Data to commit + commit_seq: Seq number of this commit + """ + self._backend.write(new_data) + self._committed_seq = commit_seq + self._version += 1 + + logger.info( + f"Store '{self.name}': committed seq={commit_seq}, version={self._version}" + ) + + def check_conflict(self, expected_seq: int) -> bool: + """ + Check if committing at expected_seq would conflict. + + Args: + expected_seq: The seq that was expected during execution + + Returns: + True if conflict (committed_seq has advanced beyond expected_seq) + """ + has_conflict = self._committed_seq != expected_seq + if has_conflict: + logger.warning( + f"Store '{self.name}': conflict detected - " + f"expected_seq={expected_seq}, committed_seq={self._committed_seq}" + ) + return has_conflict + + def __repr__(self) -> str: + return f"VersionedStore(name='{self.name}', committed_seq={self._committed_seq}, version={self._version})" diff --git a/backend/src/trigger/types.py b/backend/src/trigger/types.py new file mode 100644 index 0000000..ca5f0d3 --- /dev/null +++ b/backend/src/trigger/types.py @@ -0,0 +1,175 @@ +""" +Core types for the trigger system. +""" + +import logging +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from enum import IntEnum +from typing import Any, Optional + +logger = logging.getLogger(__name__) + + +class Priority(IntEnum): + """ + Primary execution priority for triggers. + + Lower numeric value = higher priority (dequeued first). + + Priority hierarchy (highest to lowest): + - DATA_SOURCE: Market data, real-time feeds (most time-sensitive) + - TIMER: Scheduled tasks, cron jobs + - USER_AGENT: User-agent interactions (WebSocket chat) + - USER_DATA_REQUEST: User data requests (chart loads, symbol search) + - SYSTEM: Background tasks, cleanup + - LOW: Retries after conflicts, non-critical tasks + """ + + DATA_SOURCE = 0 # Market data updates, real-time feeds + TIMER = 1 # Scheduled tasks, cron jobs + USER_AGENT = 2 # User-agent interactions (WebSocket chat) + USER_DATA_REQUEST = 3 # User data requests (chart loads, etc.) + SYSTEM = 4 # Background tasks, cleanup, etc. + LOW = 5 # Retries after conflicts, non-critical tasks + + +# Type alias for priority tuples +# Examples: +# (Priority.DATA_SOURCE,) - Simple priority +# (Priority.DATA_SOURCE, event_time) - Priority + event time +# (Priority.DATA_SOURCE, event_time, queue_seq) - Full ordering +# +# Python compares tuples element-by-element, left-to-right. +# Shorter tuple wins if all shared elements are equal. +PriorityTuple = tuple[int, ...] + + +class ExecutionState(IntEnum): + """State of an execution in the system""" + + QUEUED = 0 # In queue, waiting to be dequeued + EXECUTING = 1 # Currently executing + WAITING_COMMIT = 2 # Finished executing, waiting for sequential commit + COMMITTED = 3 # Successfully committed + EVICTED = 4 # Evicted due to conflict, will retry + FAILED = 5 # Failed with error + + +@dataclass +class CommitIntent: + """ + Intent to commit changes to a store. + + Created during execution, validated and applied during sequential commit phase. + """ + + store_name: str + """Name of the store to commit to""" + + expected_seq: int + """The seq number of the snapshot that was read (for conflict detection)""" + + new_data: Any + """The new data to commit (format depends on store backend)""" + + def __repr__(self) -> str: + data_preview = str(self.new_data)[:50] + return f"CommitIntent(store={self.store_name}, expected_seq={self.expected_seq}, data={data_preview}...)" + + +class Trigger(ABC): + """ + Abstract base class for all triggers. + + A trigger represents a unit of work that: + 1. Gets assigned a seq number when dequeued + 2. Executes (potentially long-running, async) + 3. Returns CommitIntents for any state changes + 4. Waits for sequential commit + """ + + def __init__( + self, + name: str, + priority: Priority = Priority.SYSTEM, + priority_tuple: Optional[PriorityTuple] = None + ): + """ + Initialize trigger. + + Args: + name: Descriptive name for logging + priority: Simple priority (used if priority_tuple not provided) + priority_tuple: Optional tuple for compound sorting + Examples: + (Priority.DATA_SOURCE, event_time) + (Priority.USER_AGENT, message_timestamp) + (Priority.TIMER, scheduled_time) + """ + self.name = name + self.priority = priority + self._priority_tuple = priority_tuple + + def get_priority_tuple(self, queue_seq: int) -> PriorityTuple: + """ + Get the priority tuple for queue ordering. + + If a priority tuple was provided at construction, append queue_seq. + Otherwise, create tuple from simple priority. + + Args: + queue_seq: Queue insertion order (final sort key) + + Returns: + Priority tuple for queue ordering + + Examples: + (Priority.DATA_SOURCE,) + (queue_seq,) = (0, queue_seq) + (Priority.DATA_SOURCE, 1000) + (queue_seq,) = (0, 1000, queue_seq) + """ + if self._priority_tuple is not None: + return self._priority_tuple + (queue_seq,) + else: + return (self.priority.value, queue_seq) + + @abstractmethod + async def execute(self) -> list[CommitIntent]: + """ + Execute the trigger logic. + + Can be long-running and async. Should read from stores via + VersionedStore.read_snapshot() and return CommitIntents for any changes. + + Returns: + List of CommitIntents (empty if no state changes) + + Raises: + Exception: On execution failure (will be logged, no commit) + """ + pass + + def __repr__(self) -> str: + return f"{self.__class__.__name__}(name='{self.name}', priority={self.priority.name})" + + +@dataclass +class ExecutionRecord: + """ + Record of an execution for tracking and debugging. + + Maintained by the CommitCoordinator to track in-flight executions. + """ + + seq: int + trigger: Trigger + state: ExecutionState + commit_intents: Optional[list[CommitIntent]] = None + error: Optional[str] = None + retry_count: int = 0 + + def __repr__(self) -> str: + return ( + f"ExecutionRecord(seq={self.seq}, trigger={self.trigger.name}, " + f"state={self.state.name}, retry={self.retry_count})" + ) diff --git a/deploy/Dockerfile-backend b/deploy/Dockerfile-backend index e86be2f..7f1f720 100644 --- a/deploy/Dockerfile-backend +++ b/deploy/Dockerfile-backend @@ -18,7 +18,7 @@ RUN apt-get update \ && make install \ && cd .. \ && rm -rf ta-lib ta-lib-0.4.0-src.tar.gz \ - && apt-get purge -y --auto-remove gcc g++ make wget ca-certificates \ + && apt-get purge -y --auto-remove gcc g++ make wget \ && rm -rf /var/lib/apt/lists/* # Install Python build dependencies early for better layer caching diff --git a/deploy/backend.yaml b/deploy/backend.yaml index bf63942..ebe1d9e 100644 --- a/deploy/backend.yaml +++ b/deploy/backend.yaml @@ -35,6 +35,11 @@ spec: env: - name: CONFIG value: "dev" + - name: ANTHROPIC_API_KEY + valueFrom: + secretKeyRef: + name: ai-secrets + key: anthropic-api-key volumeMounts: - name: ai-backend-data mountPath: /app/data diff --git a/web/index.html b/web/index.html index a58f36b..fa3719c 100644 --- a/web/index.html +++ b/web/index.html @@ -4,7 +4,7 @@ -