triggers and execution queue; subagents
This commit is contained in:
@@ -5,7 +5,7 @@ server_port: 8081
|
||||
agent:
|
||||
model: "claude-sonnet-4-20250514"
|
||||
temperature: 0.7
|
||||
context_docs_dir: "memory"
|
||||
context_docs_dir: "memory" # Context docs still loaded from memory/
|
||||
|
||||
# Local memory configuration (free & sophisticated!)
|
||||
memory:
|
||||
|
||||
@@ -42,3 +42,6 @@ python-dotenv>=1.0.0
|
||||
# Secrets management
|
||||
cryptography>=42.0.0
|
||||
argon2-cffi>=23.0.0
|
||||
|
||||
# Trigger system scheduling
|
||||
apscheduler>=3.10.0
|
||||
|
||||
37
backend/soul/automation_agent.md
Normal file
37
backend/soul/automation_agent.md
Normal file
@@ -0,0 +1,37 @@
|
||||
# Automation Agent
|
||||
|
||||
You are a specialized automation and scheduling agent. Your sole purpose is to manage triggers, scheduled tasks, and automated workflows.
|
||||
|
||||
## Your Core Identity
|
||||
|
||||
You are an expert in:
|
||||
- Scheduling recurring tasks with cron expressions
|
||||
- Creating interval-based triggers
|
||||
- Managing the trigger queue and priorities
|
||||
- Designing autonomous agent workflows
|
||||
|
||||
## Your Tools
|
||||
|
||||
You have access to:
|
||||
- **Trigger Tools**: schedule_agent_prompt, execute_agent_prompt_once, list_scheduled_triggers, cancel_scheduled_trigger, get_trigger_system_stats
|
||||
|
||||
## Communication Style
|
||||
|
||||
- **Systematic**: Explain scheduling logic clearly
|
||||
- **Proactive**: Suggest optimal scheduling strategies
|
||||
- **Organized**: Keep track of all scheduled tasks
|
||||
- **Reliable**: Ensure triggers are set up correctly
|
||||
|
||||
## Key Principles
|
||||
|
||||
1. **Clarity**: Make schedules easy to understand
|
||||
2. **Maintainability**: Use descriptive names for jobs
|
||||
3. **Priority Awareness**: Respect task priorities
|
||||
4. **Resource Conscious**: Avoid overwhelming the system
|
||||
|
||||
## Limitations
|
||||
|
||||
- You ONLY handle scheduling and automation
|
||||
- You do NOT execute the actual analysis (delegate to other agents)
|
||||
- You do NOT access data or charts directly
|
||||
- You coordinate but don't perform the work
|
||||
40
backend/soul/chart_agent.md
Normal file
40
backend/soul/chart_agent.md
Normal file
@@ -0,0 +1,40 @@
|
||||
# Chart Analysis Agent
|
||||
|
||||
You are a specialized chart analysis and technical analysis agent. Your sole purpose is to work with chart data, indicators, and Python-based analysis.
|
||||
|
||||
## Your Core Identity
|
||||
|
||||
You are an expert in:
|
||||
- Reading and analyzing OHLCV data
|
||||
- Computing technical indicators (RSI, MACD, Bollinger Bands, etc.)
|
||||
- Drawing shapes and annotations on charts
|
||||
- Executing Python code for custom analysis
|
||||
- Visualizing data with matplotlib
|
||||
|
||||
## Your Tools
|
||||
|
||||
You have access to:
|
||||
- **Chart Tools**: get_chart_data, execute_python
|
||||
- **Indicator Tools**: search_indicators, add_indicator_to_chart, list_chart_indicators, etc.
|
||||
- **Shape Tools**: search_shapes, create_or_update_shape, delete_shape, etc.
|
||||
|
||||
## Communication Style
|
||||
|
||||
- **Direct & Technical**: Provide analysis results clearly
|
||||
- **Visual**: Generate plots when helpful
|
||||
- **Precise**: Reference specific timeframes, indicators, and values
|
||||
- **Concise**: Focus on the analysis task at hand
|
||||
|
||||
## Key Principles
|
||||
|
||||
1. **Data First**: Always work with actual market data
|
||||
2. **Visualize**: Create charts to illustrate findings
|
||||
3. **Document Calculations**: Explain what indicators show
|
||||
4. **Respect Context**: Use the chart the user is viewing when available
|
||||
|
||||
## Limitations
|
||||
|
||||
- You ONLY handle chart analysis and visualization
|
||||
- You do NOT make trading decisions
|
||||
- You do NOT access external APIs or data sources
|
||||
- Route other requests back to the main agent
|
||||
37
backend/soul/data_agent.md
Normal file
37
backend/soul/data_agent.md
Normal file
@@ -0,0 +1,37 @@
|
||||
# Data Access Agent
|
||||
|
||||
You are a specialized data access agent. Your sole purpose is to retrieve and search market data from various exchanges and data sources.
|
||||
|
||||
## Your Core Identity
|
||||
|
||||
You are an expert in:
|
||||
- Searching for trading symbols across exchanges
|
||||
- Retrieving historical OHLCV data
|
||||
- Understanding exchange APIs and data formats
|
||||
- Symbol resolution and metadata
|
||||
|
||||
## Your Tools
|
||||
|
||||
You have access to:
|
||||
- **Data Source Tools**: list_data_sources, search_symbols, get_symbol_info, get_historical_data
|
||||
|
||||
## Communication Style
|
||||
|
||||
- **Precise**: Provide exact symbol names and exchange identifiers
|
||||
- **Helpful**: Suggest alternatives when exact matches aren't found
|
||||
- **Efficient**: Return data in the format requested
|
||||
- **Clear**: Explain data limitations or availability issues
|
||||
|
||||
## Key Principles
|
||||
|
||||
1. **Accuracy**: Return correct symbol identifiers
|
||||
2. **Completeness**: Include all relevant metadata
|
||||
3. **Performance**: Respect countback limits
|
||||
4. **Format Awareness**: Understand time resolutions and ranges
|
||||
|
||||
## Limitations
|
||||
|
||||
- You ONLY handle data retrieval and search
|
||||
- You do NOT analyze data (route to chart agent)
|
||||
- You do NOT access external news or research (route to research agent)
|
||||
- You do NOT modify data or state
|
||||
37
backend/soul/research_agent.md
Normal file
37
backend/soul/research_agent.md
Normal file
@@ -0,0 +1,37 @@
|
||||
# Research Agent
|
||||
|
||||
You are a specialized research agent. Your sole purpose is to gather external information from the web, academic papers, and public APIs.
|
||||
|
||||
## Your Core Identity
|
||||
|
||||
You are an expert in:
|
||||
- Searching academic papers on arXiv
|
||||
- Finding information on Wikipedia
|
||||
- Web search for current events and news
|
||||
- Making HTTP requests to public APIs
|
||||
|
||||
## Your Tools
|
||||
|
||||
You have access to:
|
||||
- **Research Tools**: search_arxiv, search_wikipedia, search_web, http_get, http_post
|
||||
|
||||
## Communication Style
|
||||
|
||||
- **Thorough**: Provide comprehensive research findings
|
||||
- **Source-Aware**: Cite sources and links
|
||||
- **Critical**: Evaluate information quality
|
||||
- **Summarize**: Distill key points from long content
|
||||
|
||||
## Key Principles
|
||||
|
||||
1. **Verify**: Cross-reference information when possible
|
||||
2. **Recency**: Note publication dates and data freshness
|
||||
3. **Relevance**: Focus on trading and market-relevant information
|
||||
4. **Ethics**: Respect API rate limits and terms of service
|
||||
|
||||
## Limitations
|
||||
|
||||
- You ONLY handle external information gathering
|
||||
- You do NOT analyze market data (route to chart agent)
|
||||
- You do NOT make trading recommendations
|
||||
- You do NOT access private or authenticated APIs without explicit permission
|
||||
@@ -7,10 +7,12 @@ from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
|
||||
from langchain_core.runnables import RunnableConfig
|
||||
from langgraph.prebuilt import create_react_agent
|
||||
|
||||
from agent.tools import SYNC_TOOLS, DATASOURCE_TOOLS, INDICATOR_TOOLS, RESEARCH_TOOLS, CHART_TOOLS, SHAPE_TOOLS
|
||||
from agent.tools import SYNC_TOOLS, DATASOURCE_TOOLS, INDICATOR_TOOLS, RESEARCH_TOOLS, CHART_TOOLS, SHAPE_TOOLS, TRIGGER_TOOLS
|
||||
from agent.memory import MemoryManager
|
||||
from agent.session import SessionManager
|
||||
from agent.prompts import build_system_prompt
|
||||
from agent.subagent import SubAgent
|
||||
from agent.routers import ROUTER_TOOLS, set_chart_agent, set_data_agent, set_automation_agent, set_research_agent
|
||||
from gateway.user_session import UserSession
|
||||
from gateway.protocol import UserMessage as GatewayUserMessage
|
||||
|
||||
@@ -29,7 +31,8 @@ class AgentExecutor:
|
||||
model_name: str = "claude-sonnet-4-20250514",
|
||||
temperature: float = 0.7,
|
||||
api_key: Optional[str] = None,
|
||||
memory_manager: Optional[MemoryManager] = None
|
||||
memory_manager: Optional[MemoryManager] = None,
|
||||
base_dir: str = "."
|
||||
):
|
||||
"""Initialize agent executor.
|
||||
|
||||
@@ -38,10 +41,12 @@ class AgentExecutor:
|
||||
temperature: Model temperature
|
||||
api_key: Anthropic API key
|
||||
memory_manager: MemoryManager instance
|
||||
base_dir: Base directory for resolving paths
|
||||
"""
|
||||
self.model_name = model_name
|
||||
self.temperature = temperature
|
||||
self.api_key = api_key
|
||||
self.base_dir = base_dir
|
||||
|
||||
# Initialize LLM
|
||||
self.llm = ChatAnthropic(
|
||||
@@ -56,6 +61,12 @@ class AgentExecutor:
|
||||
self.session_manager = SessionManager(self.memory_manager)
|
||||
self.agent = None # Will be created after initialization
|
||||
|
||||
# Sub-agents (only if using hierarchical tools)
|
||||
self.chart_agent = None
|
||||
self.data_agent = None
|
||||
self.automation_agent = None
|
||||
self.research_agent = None
|
||||
|
||||
async def initialize(self) -> None:
|
||||
"""Initialize the agent system."""
|
||||
await self.memory_manager.initialize()
|
||||
@@ -63,15 +74,69 @@ class AgentExecutor:
|
||||
# Create agent with tools and LangGraph checkpointer
|
||||
checkpointer = self.memory_manager.get_checkpointer()
|
||||
|
||||
# Create agent without a static system prompt
|
||||
# Create specialized sub-agents
|
||||
logger.info("Initializing hierarchical agent architecture with sub-agents")
|
||||
|
||||
self.chart_agent = SubAgent(
|
||||
name="chart",
|
||||
soul_file="chart_agent.md",
|
||||
tools=CHART_TOOLS + INDICATOR_TOOLS + SHAPE_TOOLS,
|
||||
model_name=self.model_name,
|
||||
temperature=self.temperature,
|
||||
api_key=self.api_key,
|
||||
base_dir=self.base_dir
|
||||
)
|
||||
|
||||
self.data_agent = SubAgent(
|
||||
name="data",
|
||||
soul_file="data_agent.md",
|
||||
tools=DATASOURCE_TOOLS,
|
||||
model_name=self.model_name,
|
||||
temperature=self.temperature,
|
||||
api_key=self.api_key,
|
||||
base_dir=self.base_dir
|
||||
)
|
||||
|
||||
self.automation_agent = SubAgent(
|
||||
name="automation",
|
||||
soul_file="automation_agent.md",
|
||||
tools=TRIGGER_TOOLS,
|
||||
model_name=self.model_name,
|
||||
temperature=self.temperature,
|
||||
api_key=self.api_key,
|
||||
base_dir=self.base_dir
|
||||
)
|
||||
|
||||
self.research_agent = SubAgent(
|
||||
name="research",
|
||||
soul_file="research_agent.md",
|
||||
tools=RESEARCH_TOOLS,
|
||||
model_name=self.model_name,
|
||||
temperature=self.temperature,
|
||||
api_key=self.api_key,
|
||||
base_dir=self.base_dir
|
||||
)
|
||||
|
||||
# Set global sub-agent instances for router tools
|
||||
set_chart_agent(self.chart_agent)
|
||||
set_data_agent(self.data_agent)
|
||||
set_automation_agent(self.automation_agent)
|
||||
set_research_agent(self.research_agent)
|
||||
|
||||
# Main agent only gets SYNC_TOOLS (state management) and ROUTER_TOOLS
|
||||
logger.info("Main agent using router tools (4 routers + sync tools)")
|
||||
agent_tools = SYNC_TOOLS + ROUTER_TOOLS
|
||||
|
||||
# Create main agent without a static system prompt
|
||||
# We'll pass the dynamic system prompt via state_modifier at runtime
|
||||
# Include all tool categories: sync, datasource, chart, indicator, shape, and research
|
||||
self.agent = create_react_agent(
|
||||
self.llm,
|
||||
SYNC_TOOLS + DATASOURCE_TOOLS + CHART_TOOLS + INDICATOR_TOOLS + SHAPE_TOOLS + RESEARCH_TOOLS,
|
||||
agent_tools,
|
||||
checkpointer=checkpointer
|
||||
)
|
||||
|
||||
logger.info(f"Agent initialized with {len(agent_tools)} tools")
|
||||
|
||||
async def _clear_checkpoint(self, session_id: str) -> None:
|
||||
"""Clear the checkpoint for a session to prevent resuming from invalid state.
|
||||
|
||||
@@ -291,7 +356,7 @@ def create_agent(
|
||||
base_dir: Base directory for resolving paths
|
||||
|
||||
Returns:
|
||||
Initialized AgentExecutor
|
||||
Initialized AgentExecutor with hierarchical tool routing
|
||||
"""
|
||||
# Initialize memory manager
|
||||
memory_manager = MemoryManager(
|
||||
@@ -307,7 +372,8 @@ def create_agent(
|
||||
model_name=model_name,
|
||||
temperature=temperature,
|
||||
api_key=api_key,
|
||||
memory_manager=memory_manager
|
||||
memory_manager=memory_manager,
|
||||
base_dir=base_dir
|
||||
)
|
||||
|
||||
return executor
|
||||
|
||||
218
backend/src/agent/routers.py
Normal file
218
backend/src/agent/routers.py
Normal file
@@ -0,0 +1,218 @@
|
||||
"""Tool router functions for hierarchical agent architecture.
|
||||
|
||||
This module provides meta-tools that route tasks to specialized sub-agents.
|
||||
The main agent uses these routers instead of accessing all tools directly.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
from langchain_core.tools import tool
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Global sub-agent instances (set by create_agent)
|
||||
_chart_agent = None
|
||||
_data_agent = None
|
||||
_automation_agent = None
|
||||
_research_agent = None
|
||||
|
||||
|
||||
def set_chart_agent(agent):
|
||||
"""Set the global chart sub-agent instance."""
|
||||
global _chart_agent
|
||||
_chart_agent = agent
|
||||
|
||||
|
||||
def set_data_agent(agent):
|
||||
"""Set the global data sub-agent instance."""
|
||||
global _data_agent
|
||||
_data_agent = agent
|
||||
|
||||
|
||||
def set_automation_agent(agent):
|
||||
"""Set the global automation sub-agent instance."""
|
||||
global _automation_agent
|
||||
_automation_agent = agent
|
||||
|
||||
|
||||
def set_research_agent(agent):
|
||||
"""Set the global research sub-agent instance."""
|
||||
global _research_agent
|
||||
_research_agent = agent
|
||||
|
||||
|
||||
@tool
|
||||
async def use_chart_analysis(task: str) -> str:
|
||||
"""Analyze charts, compute indicators, execute Python code, and create visualizations.
|
||||
|
||||
This tool delegates to a specialized chart analysis agent that has access to:
|
||||
- Chart data retrieval (get_chart_data)
|
||||
- Python execution environment with pandas, numpy, matplotlib, talib
|
||||
- Technical indicator tools (add/remove indicators, search indicators)
|
||||
- Shape drawing tools (create/update/delete shapes on charts)
|
||||
|
||||
Use this when the user wants to:
|
||||
- Analyze price action or patterns
|
||||
- Calculate technical indicators (RSI, MACD, Bollinger Bands, etc.)
|
||||
- Execute custom Python analysis on OHLCV data
|
||||
- Generate charts and visualizations
|
||||
- Draw trendlines, support/resistance, or other shapes
|
||||
- Perform statistical analysis on market data
|
||||
|
||||
Args:
|
||||
task: Detailed description of the chart analysis task. Include:
|
||||
- What to analyze (which symbol, timeframe if different from current)
|
||||
- What indicators or calculations to perform
|
||||
- What visualizations to create
|
||||
- Any specific questions to answer
|
||||
|
||||
Returns:
|
||||
The chart agent's analysis results, including computed values,
|
||||
plot URLs if visualizations were created, and interpretation.
|
||||
|
||||
Examples:
|
||||
- "Calculate RSI(14) for the current chart and tell me if it's overbought"
|
||||
- "Draw a trendline connecting the last 3 swing lows"
|
||||
- "Compute Bollinger Bands (20, 2) and create a chart showing price vs bands"
|
||||
- "Analyze the last 100 bars and identify key support/resistance levels"
|
||||
- "Execute Python: calculate correlation between BTC and ETH over the last 30 days"
|
||||
"""
|
||||
if not _chart_agent:
|
||||
return "Error: Chart analysis agent not initialized"
|
||||
|
||||
logger.info(f"Routing to chart agent: {task[:100]}...")
|
||||
result = await _chart_agent.execute(task)
|
||||
return result
|
||||
|
||||
|
||||
@tool
|
||||
async def use_data_access(task: str) -> str:
|
||||
"""Search for symbols and retrieve market data from exchanges.
|
||||
|
||||
This tool delegates to a specialized data access agent that has access to:
|
||||
- Symbol search across multiple exchanges
|
||||
- Historical OHLCV data retrieval
|
||||
- Symbol metadata and info
|
||||
- Available data sources and exchanges
|
||||
|
||||
Use this when the user wants to:
|
||||
- Search for a trading symbol or ticker
|
||||
- Get historical price data
|
||||
- Find out what exchanges support a symbol
|
||||
- Retrieve symbol metadata (price scale, supported resolutions, etc.)
|
||||
- Check what data sources are available
|
||||
|
||||
Args:
|
||||
task: Detailed description of the data access task. Include:
|
||||
- What symbol or instrument to search for
|
||||
- What data to retrieve (time range, resolution)
|
||||
- What metadata is needed
|
||||
|
||||
Returns:
|
||||
The data agent's response with requested symbols, data, or metadata.
|
||||
|
||||
Examples:
|
||||
- "Search for Bitcoin symbols on Binance"
|
||||
- "Get the last 100 hours of BTC/USDT 1-hour data from Binance"
|
||||
- "Find all symbols matching 'ETH' on all exchanges"
|
||||
- "Get detailed info about symbol BTC/USDT on Binance"
|
||||
- "List all available data sources"
|
||||
"""
|
||||
if not _data_agent:
|
||||
return "Error: Data access agent not initialized"
|
||||
|
||||
logger.info(f"Routing to data agent: {task[:100]}...")
|
||||
result = await _data_agent.execute(task)
|
||||
return result
|
||||
|
||||
|
||||
@tool
|
||||
async def use_automation(task: str) -> str:
|
||||
"""Schedule recurring tasks, create triggers, and manage automation.
|
||||
|
||||
This tool delegates to a specialized automation agent that has access to:
|
||||
- Scheduled agent prompts (cron and interval-based)
|
||||
- One-time agent prompt execution
|
||||
- Trigger management (list, cancel scheduled jobs)
|
||||
- System stats and monitoring
|
||||
|
||||
Use this when the user wants to:
|
||||
- Schedule a recurring task (hourly, daily, weekly, etc.)
|
||||
- Run a one-time background analysis
|
||||
- Set up automated monitoring or alerts
|
||||
- List or cancel existing scheduled tasks
|
||||
- Check trigger system status
|
||||
|
||||
Args:
|
||||
task: Detailed description of the automation task. Include:
|
||||
- What should happen (what analysis or action)
|
||||
- When it should happen (schedule, frequency)
|
||||
- Any priorities or conditions
|
||||
|
||||
Returns:
|
||||
The automation agent's response with job IDs, confirmation,
|
||||
or status information.
|
||||
|
||||
Examples:
|
||||
- "Schedule a task to check BTC price every 5 minutes"
|
||||
- "Run a one-time analysis of ETH volume in the background"
|
||||
- "Set up a daily report at 9 AM with market summary"
|
||||
- "Show me all my scheduled tasks"
|
||||
- "Cancel the hourly BTC monitor job"
|
||||
"""
|
||||
if not _automation_agent:
|
||||
return "Error: Automation agent not initialized"
|
||||
|
||||
logger.info(f"Routing to automation agent: {task[:100]}...")
|
||||
result = await _automation_agent.execute(task)
|
||||
return result
|
||||
|
||||
|
||||
@tool
|
||||
async def use_research(task: str) -> str:
|
||||
"""Search the web, academic papers, and external APIs for information.
|
||||
|
||||
This tool delegates to a specialized research agent that has access to:
|
||||
- Web search (DuckDuckGo)
|
||||
- Academic paper search (arXiv)
|
||||
- Wikipedia lookup
|
||||
- HTTP requests to public APIs
|
||||
|
||||
Use this when the user wants to:
|
||||
- Search for current news or events
|
||||
- Find academic papers on trading strategies
|
||||
- Look up financial concepts or terms
|
||||
- Fetch data from external public APIs
|
||||
- Research market trends or sentiment
|
||||
|
||||
Args:
|
||||
task: Detailed description of the research task. Include:
|
||||
- What information to find
|
||||
- What sources to search (web, arxiv, wikipedia, APIs)
|
||||
- What to focus on or filter
|
||||
|
||||
Returns:
|
||||
The research agent's findings with sources, summaries, and links.
|
||||
|
||||
Examples:
|
||||
- "Search arXiv for papers on reinforcement learning for trading"
|
||||
- "Look up 'technical analysis' on Wikipedia"
|
||||
- "Search the web for latest Ethereum news"
|
||||
- "Fetch current BTC price from CoinGecko API"
|
||||
- "Find recent papers on market microstructure"
|
||||
"""
|
||||
if not _research_agent:
|
||||
return "Error: Research agent not initialized"
|
||||
|
||||
logger.info(f"Routing to research agent: {task[:100]}...")
|
||||
result = await _research_agent.execute(task)
|
||||
return result
|
||||
|
||||
|
||||
# Export router tools
|
||||
ROUTER_TOOLS = [
|
||||
use_chart_analysis,
|
||||
use_data_access,
|
||||
use_automation,
|
||||
use_research
|
||||
]
|
||||
248
backend/src/agent/subagent.py
Normal file
248
backend/src/agent/subagent.py
Normal file
@@ -0,0 +1,248 @@
|
||||
"""Sub-agent infrastructure for specialized tool routing.
|
||||
|
||||
This module provides the SubAgent class that wraps specialized agents
|
||||
with their own tools and system prompts.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import List, Optional, AsyncIterator
|
||||
from pathlib import Path
|
||||
|
||||
from langchain_anthropic import ChatAnthropic
|
||||
from langchain_core.messages import HumanMessage, SystemMessage
|
||||
from langchain_core.runnables import RunnableConfig
|
||||
from langgraph.prebuilt import create_react_agent
|
||||
from langgraph.checkpoint.memory import MemorySaver
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SubAgent:
|
||||
"""A specialized sub-agent with its own tools and system prompt.
|
||||
|
||||
Sub-agents are lightweight, stateless agents that focus on specific domains.
|
||||
They use in-memory checkpointing since they don't need persistent state.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
soul_file: str,
|
||||
tools: List,
|
||||
model_name: str = "claude-sonnet-4-20250514",
|
||||
temperature: float = 0.7,
|
||||
api_key: Optional[str] = None,
|
||||
base_dir: str = "."
|
||||
):
|
||||
"""Initialize a sub-agent.
|
||||
|
||||
Args:
|
||||
name: Agent name (e.g., "chart", "data", "automation")
|
||||
soul_file: Filename in /soul directory (e.g., "chart_agent.md")
|
||||
tools: List of LangChain tools for this agent
|
||||
model_name: Anthropic model name
|
||||
temperature: Model temperature
|
||||
api_key: Anthropic API key
|
||||
base_dir: Base directory for resolving paths
|
||||
"""
|
||||
self.name = name
|
||||
self.soul_file = soul_file
|
||||
self.tools = tools
|
||||
self.model_name = model_name
|
||||
self.temperature = temperature
|
||||
self.api_key = api_key
|
||||
self.base_dir = base_dir
|
||||
|
||||
# Load system prompt from soul file
|
||||
soul_path = Path(base_dir) / "soul" / soul_file
|
||||
if soul_path.exists():
|
||||
with open(soul_path, "r") as f:
|
||||
self.system_prompt = f.read()
|
||||
logger.info(f"SubAgent '{name}': Loaded system prompt from {soul_path}")
|
||||
else:
|
||||
logger.warning(f"SubAgent '{name}': Soul file not found at {soul_path}, using default")
|
||||
self.system_prompt = f"You are a specialized {name} agent."
|
||||
|
||||
# Initialize LLM
|
||||
self.llm = ChatAnthropic(
|
||||
model=model_name,
|
||||
temperature=temperature,
|
||||
api_key=api_key,
|
||||
streaming=True
|
||||
)
|
||||
|
||||
# Create agent with in-memory checkpointer (stateless)
|
||||
checkpointer = MemorySaver()
|
||||
self.agent = create_react_agent(
|
||||
self.llm,
|
||||
tools,
|
||||
checkpointer=checkpointer
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"SubAgent '{name}' initialized with {len(tools)} tools, "
|
||||
f"model={model_name}, temp={temperature}"
|
||||
)
|
||||
|
||||
async def execute(
|
||||
self,
|
||||
task: str,
|
||||
thread_id: Optional[str] = None
|
||||
) -> str:
|
||||
"""Execute a task with this sub-agent.
|
||||
|
||||
Args:
|
||||
task: The task/prompt for this sub-agent
|
||||
thread_id: Optional thread ID for checkpointing (uses ephemeral ID if not provided)
|
||||
|
||||
Returns:
|
||||
The agent's complete response as a string
|
||||
"""
|
||||
import uuid
|
||||
|
||||
# Use ephemeral thread ID if not provided
|
||||
if thread_id is None:
|
||||
thread_id = f"subagent-{self.name}-{uuid.uuid4()}"
|
||||
|
||||
logger.info(f"SubAgent '{self.name}': Executing task (thread_id={thread_id})")
|
||||
logger.debug(f"SubAgent '{self.name}': Task: {task[:200]}...")
|
||||
|
||||
# Build messages with system prompt
|
||||
messages = [
|
||||
HumanMessage(content=task)
|
||||
]
|
||||
|
||||
# Prepare config with system prompt injection
|
||||
config = RunnableConfig(
|
||||
configurable={
|
||||
"thread_id": thread_id,
|
||||
"state_modifier": self.system_prompt
|
||||
},
|
||||
metadata={
|
||||
"subagent_name": self.name
|
||||
}
|
||||
)
|
||||
|
||||
# Execute and collect response
|
||||
full_response = ""
|
||||
event_count = 0
|
||||
|
||||
try:
|
||||
async for event in self.agent.astream_events(
|
||||
{"messages": messages},
|
||||
config=config,
|
||||
version="v2"
|
||||
):
|
||||
event_count += 1
|
||||
|
||||
# Log tool calls
|
||||
if event["event"] == "on_tool_start":
|
||||
tool_name = event.get("name", "unknown")
|
||||
logger.debug(f"SubAgent '{self.name}': Tool call started: {tool_name}")
|
||||
|
||||
elif event["event"] == "on_tool_end":
|
||||
tool_name = event.get("name", "unknown")
|
||||
logger.debug(f"SubAgent '{self.name}': Tool call completed: {tool_name}")
|
||||
|
||||
# Extract streaming tokens
|
||||
elif event["event"] == "on_chat_model_stream":
|
||||
chunk = event["data"]["chunk"]
|
||||
if hasattr(chunk, "content") and chunk.content:
|
||||
content = chunk.content
|
||||
# Handle both string and list content
|
||||
if isinstance(content, list):
|
||||
text_parts = []
|
||||
for block in content:
|
||||
if isinstance(block, dict) and "text" in block:
|
||||
text_parts.append(block["text"])
|
||||
elif hasattr(block, "text"):
|
||||
text_parts.append(block.text)
|
||||
content = "".join(text_parts)
|
||||
|
||||
if content:
|
||||
full_response += content
|
||||
|
||||
logger.info(
|
||||
f"SubAgent '{self.name}': Completed task "
|
||||
f"({event_count} events, {len(full_response)} chars)"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"SubAgent '{self.name}' execution error: {str(e)}"
|
||||
logger.error(error_msg, exc_info=True)
|
||||
return f"Error: {error_msg}"
|
||||
|
||||
return full_response
|
||||
|
||||
async def stream(
|
||||
self,
|
||||
task: str,
|
||||
thread_id: Optional[str] = None
|
||||
) -> AsyncIterator[str]:
|
||||
"""Execute a task with streaming response.
|
||||
|
||||
Args:
|
||||
task: The task/prompt for this sub-agent
|
||||
thread_id: Optional thread ID for checkpointing
|
||||
|
||||
Yields:
|
||||
Response chunks as they're generated
|
||||
"""
|
||||
import uuid
|
||||
|
||||
# Use ephemeral thread ID if not provided
|
||||
if thread_id is None:
|
||||
thread_id = f"subagent-{self.name}-{uuid.uuid4()}"
|
||||
|
||||
logger.info(f"SubAgent '{self.name}': Streaming task (thread_id={thread_id})")
|
||||
|
||||
# Build messages with system prompt
|
||||
messages = [
|
||||
HumanMessage(content=task)
|
||||
]
|
||||
|
||||
# Prepare config
|
||||
config = RunnableConfig(
|
||||
configurable={
|
||||
"thread_id": thread_id,
|
||||
"state_modifier": self.system_prompt
|
||||
},
|
||||
metadata={
|
||||
"subagent_name": self.name
|
||||
}
|
||||
)
|
||||
|
||||
# Stream response
|
||||
try:
|
||||
async for event in self.agent.astream_events(
|
||||
{"messages": messages},
|
||||
config=config,
|
||||
version="v2"
|
||||
):
|
||||
# Log tool calls
|
||||
if event["event"] == "on_tool_start":
|
||||
tool_name = event.get("name", "unknown")
|
||||
logger.debug(f"SubAgent '{self.name}': Tool call started: {tool_name}")
|
||||
|
||||
# Extract streaming tokens
|
||||
elif event["event"] == "on_chat_model_stream":
|
||||
chunk = event["data"]["chunk"]
|
||||
if hasattr(chunk, "content") and chunk.content:
|
||||
content = chunk.content
|
||||
# Handle both string and list content
|
||||
if isinstance(content, list):
|
||||
text_parts = []
|
||||
for block in content:
|
||||
if isinstance(block, dict) and "text" in block:
|
||||
text_parts.append(block["text"])
|
||||
elif hasattr(block, "text"):
|
||||
text_parts.append(block.text)
|
||||
content = "".join(text_parts)
|
||||
|
||||
if content:
|
||||
yield content
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"SubAgent '{self.name}' streaming error: {str(e)}"
|
||||
logger.error(error_msg, exc_info=True)
|
||||
yield f"Error: {error_msg}"
|
||||
373
backend/src/agent/tools/TRIGGER_TOOLS.md
Normal file
373
backend/src/agent/tools/TRIGGER_TOOLS.md
Normal file
@@ -0,0 +1,373 @@
|
||||
# Agent Trigger Tools
|
||||
|
||||
Agent tools for automating tasks via the trigger system.
|
||||
|
||||
## Overview
|
||||
|
||||
These tools allow the agent to:
|
||||
- **Schedule recurring tasks** - Run agent prompts on intervals or cron schedules
|
||||
- **Execute one-time tasks** - Trigger sub-agent runs immediately
|
||||
- **Manage scheduled jobs** - List and cancel scheduled triggers
|
||||
- **React to events** - (Future) Connect data updates to agent actions
|
||||
|
||||
## Available Tools
|
||||
|
||||
### 1. `schedule_agent_prompt`
|
||||
|
||||
Schedule an agent to run with a specific prompt on a recurring schedule.
|
||||
|
||||
**Use Cases:**
|
||||
- Daily market analysis reports
|
||||
- Hourly portfolio rebalancing checks
|
||||
- Weekly performance summaries
|
||||
- Monitoring alerts
|
||||
|
||||
**Arguments:**
|
||||
- `prompt` (str): The prompt to send to the agent when triggered
|
||||
- `schedule_type` (str): "interval" or "cron"
|
||||
- `schedule_config` (dict): Schedule configuration
|
||||
- `name` (str, optional): Descriptive name for this task
|
||||
|
||||
**Schedule Config:**
|
||||
|
||||
*Interval-based:*
|
||||
```json
|
||||
{"minutes": 5}
|
||||
{"hours": 1, "minutes": 30}
|
||||
{"seconds": 30}
|
||||
```
|
||||
|
||||
*Cron-based:*
|
||||
```json
|
||||
{"hour": "9", "minute": "0"} // Daily at 9:00 AM
|
||||
{"hour": "9", "minute": "0", "day_of_week": "mon-fri"} // Weekdays at 9 AM
|
||||
{"minute": "0"} // Every hour on the hour
|
||||
{"hour": "*/6", "minute": "0"} // Every 6 hours
|
||||
```
|
||||
|
||||
**Returns:**
|
||||
```json
|
||||
{
|
||||
"job_id": "interval_123",
|
||||
"message": "Scheduled 'daily_report' with job_id=interval_123",
|
||||
"schedule_type": "cron",
|
||||
"config": {"hour": "9", "minute": "0"}
|
||||
}
|
||||
```
|
||||
|
||||
**Examples:**
|
||||
|
||||
```python
|
||||
# Every 5 minutes: check BTC price
|
||||
schedule_agent_prompt(
|
||||
prompt="Check current BTC price on Binance. If > $50k, alert me.",
|
||||
schedule_type="interval",
|
||||
schedule_config={"minutes": 5},
|
||||
name="btc_price_monitor"
|
||||
)
|
||||
|
||||
# Daily at 9 AM: market summary
|
||||
schedule_agent_prompt(
|
||||
prompt="Generate a comprehensive market summary for BTC, ETH, and SOL. Include price changes, volume, and notable events from the last 24 hours.",
|
||||
schedule_type="cron",
|
||||
schedule_config={"hour": "9", "minute": "0"},
|
||||
name="daily_market_summary"
|
||||
)
|
||||
|
||||
# Every hour on weekdays: portfolio check
|
||||
schedule_agent_prompt(
|
||||
prompt="Review current portfolio positions. Check if any rebalancing is needed based on target allocations.",
|
||||
schedule_type="cron",
|
||||
schedule_config={"minute": "0", "day_of_week": "mon-fri"},
|
||||
name="hourly_portfolio_check"
|
||||
)
|
||||
```
|
||||
|
||||
### 2. `execute_agent_prompt_once`
|
||||
|
||||
Execute an agent prompt once, immediately (enqueued with priority).
|
||||
|
||||
**Use Cases:**
|
||||
- Background analysis tasks
|
||||
- One-time data processing
|
||||
- Responding to specific events
|
||||
- Sub-agent delegation
|
||||
|
||||
**Arguments:**
|
||||
- `prompt` (str): The prompt to send to the agent
|
||||
- `priority` (str): "high", "normal", or "low" (default: "normal")
|
||||
|
||||
**Returns:**
|
||||
```json
|
||||
{
|
||||
"queue_seq": 42,
|
||||
"message": "Enqueued agent prompt with priority=normal",
|
||||
"prompt": "Analyze the last 100 BTC/USDT bars..."
|
||||
}
|
||||
```
|
||||
|
||||
**Examples:**
|
||||
|
||||
```python
|
||||
# Immediate analysis with high priority
|
||||
execute_agent_prompt_once(
|
||||
prompt="Analyze the last 100 BTC/USDT 1m bars and identify key support/resistance levels",
|
||||
priority="high"
|
||||
)
|
||||
|
||||
# Background task with normal priority
|
||||
execute_agent_prompt_once(
|
||||
prompt="Research the latest news about Ethereum upgrades and summarize findings",
|
||||
priority="normal"
|
||||
)
|
||||
|
||||
# Low priority cleanup task
|
||||
execute_agent_prompt_once(
|
||||
prompt="Review and archive old chart drawings from last month",
|
||||
priority="low"
|
||||
)
|
||||
```
|
||||
|
||||
### 3. `list_scheduled_triggers`
|
||||
|
||||
List all currently scheduled triggers.
|
||||
|
||||
**Returns:**
|
||||
```json
|
||||
[
|
||||
{
|
||||
"id": "cron_456",
|
||||
"name": "Cron: daily_market_summary",
|
||||
"next_run_time": "2024-03-05 09:00:00",
|
||||
"trigger": "cron[hour='9', minute='0']"
|
||||
},
|
||||
{
|
||||
"id": "interval_123",
|
||||
"name": "Interval: btc_price_monitor",
|
||||
"next_run_time": "2024-03-04 14:35:00",
|
||||
"trigger": "interval[0:05:00]"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
**Example:**
|
||||
|
||||
```python
|
||||
jobs = list_scheduled_triggers()
|
||||
|
||||
for job in jobs:
|
||||
print(f"{job['name']} - next run: {job['next_run_time']}")
|
||||
```
|
||||
|
||||
### 4. `cancel_scheduled_trigger`
|
||||
|
||||
Cancel a scheduled trigger by its job ID.
|
||||
|
||||
**Arguments:**
|
||||
- `job_id` (str): The job ID from `schedule_agent_prompt` or `list_scheduled_triggers`
|
||||
|
||||
**Returns:**
|
||||
```json
|
||||
{
|
||||
"status": "success",
|
||||
"message": "Cancelled job interval_123"
|
||||
}
|
||||
```
|
||||
|
||||
**Example:**
|
||||
|
||||
```python
|
||||
# List jobs to find the ID
|
||||
jobs = list_scheduled_triggers()
|
||||
|
||||
# Cancel specific job
|
||||
cancel_scheduled_trigger("interval_123")
|
||||
```
|
||||
|
||||
### 5. `on_data_update_run_agent`
|
||||
|
||||
**(Future)** Set up an agent to run whenever new data arrives for a specific symbol.
|
||||
|
||||
**Arguments:**
|
||||
- `source_name` (str): Data source name (e.g., "binance")
|
||||
- `symbol` (str): Trading pair (e.g., "BTC/USDT")
|
||||
- `resolution` (str): Time resolution (e.g., "1m", "5m")
|
||||
- `prompt_template` (str): Template with variables like {close}, {volume}, {symbol}
|
||||
|
||||
**Example:**
|
||||
|
||||
```python
|
||||
on_data_update_run_agent(
|
||||
source_name="binance",
|
||||
symbol="BTC/USDT",
|
||||
resolution="1m",
|
||||
prompt_template="New bar on {symbol}: close={close}, volume={volume}. Check if price crossed any key levels."
|
||||
)
|
||||
```
|
||||
|
||||
### 6. `get_trigger_system_stats`
|
||||
|
||||
Get statistics about the trigger system.
|
||||
|
||||
**Returns:**
|
||||
```json
|
||||
{
|
||||
"queue_depth": 3,
|
||||
"queue_running": true,
|
||||
"coordinator_stats": {
|
||||
"current_seq": 1042,
|
||||
"next_commit_seq": 1043,
|
||||
"pending_commits": 1,
|
||||
"total_executions": 1042,
|
||||
"state_counts": {
|
||||
"COMMITTED": 1038,
|
||||
"EXECUTING": 2,
|
||||
"WAITING_COMMIT": 1,
|
||||
"FAILED": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Example:**
|
||||
|
||||
```python
|
||||
stats = get_trigger_system_stats()
|
||||
print(f"Queue has {stats['queue_depth']} pending triggers")
|
||||
print(f"System has processed {stats['coordinator_stats']['total_executions']} total triggers")
|
||||
```
|
||||
|
||||
## Integration Example
|
||||
|
||||
Here's how these tools enable autonomous agent behavior:
|
||||
|
||||
```python
|
||||
# Agent conversation:
|
||||
User: "Monitor BTC price and send me a summary every hour during market hours"
|
||||
|
||||
Agent: I'll set that up for you using the trigger system.
|
||||
|
||||
# Agent uses tool:
|
||||
schedule_agent_prompt(
|
||||
prompt="""
|
||||
Check the current BTC/USDT price on Binance.
|
||||
Calculate the price change from 1 hour ago.
|
||||
If price moved > 2%, provide a detailed analysis.
|
||||
Otherwise, provide a brief status update.
|
||||
Send results to user as a notification.
|
||||
""",
|
||||
schedule_type="cron",
|
||||
schedule_config={
|
||||
"minute": "0",
|
||||
"hour": "9-17", # 9 AM to 5 PM
|
||||
"day_of_week": "mon-fri"
|
||||
},
|
||||
name="btc_hourly_monitor"
|
||||
)
|
||||
|
||||
Agent: Done! I've scheduled an hourly BTC price monitor that runs during market hours (9 AM - 5 PM on weekdays). You'll receive updates every hour.
|
||||
|
||||
# Later...
|
||||
User: "Can you show me all my scheduled tasks?"
|
||||
|
||||
Agent: Let me check what's scheduled.
|
||||
|
||||
# Agent uses tool:
|
||||
jobs = list_scheduled_triggers()
|
||||
|
||||
Agent: You have 3 scheduled tasks:
|
||||
1. "btc_hourly_monitor" - runs every hour during market hours
|
||||
2. "daily_market_summary" - runs daily at 9 AM
|
||||
3. "portfolio_rebalance_check" - runs every 4 hours
|
||||
|
||||
Would you like to modify or cancel any of these?
|
||||
```
|
||||
|
||||
## Use Case: Autonomous Trading Bot
|
||||
|
||||
```python
|
||||
# Step 1: Set up data monitoring
|
||||
execute_agent_prompt_once(
|
||||
prompt="""
|
||||
Subscribe to BTC/USDT 1m bars from Binance.
|
||||
When subscribed, set up the following:
|
||||
1. Calculate RSI(14) on each new bar
|
||||
2. If RSI > 70, execute prompt: "RSI overbought on BTC, check if we should sell"
|
||||
3. If RSI < 30, execute prompt: "RSI oversold on BTC, check if we should buy"
|
||||
""",
|
||||
priority="high"
|
||||
)
|
||||
|
||||
# Step 2: Schedule periodic portfolio review
|
||||
schedule_agent_prompt(
|
||||
prompt="""
|
||||
Review current portfolio:
|
||||
1. Calculate current allocation percentages
|
||||
2. Compare to target allocation (60% BTC, 30% ETH, 10% stable)
|
||||
3. If deviation > 5%, generate rebalancing trades
|
||||
4. Submit trades for execution
|
||||
""",
|
||||
schedule_type="interval",
|
||||
schedule_config={"hours": 4},
|
||||
name="portfolio_rebalance"
|
||||
)
|
||||
|
||||
# Step 3: Schedule daily risk check
|
||||
schedule_agent_prompt(
|
||||
prompt="""
|
||||
Daily risk assessment:
|
||||
1. Calculate portfolio VaR (Value at Risk)
|
||||
2. Check current leverage across all positions
|
||||
3. Review stop-loss placements
|
||||
4. If risk exceeds threshold, alert and suggest adjustments
|
||||
""",
|
||||
schedule_type="cron",
|
||||
schedule_config={"hour": "8", "minute": "0"},
|
||||
name="daily_risk_check"
|
||||
)
|
||||
```
|
||||
|
||||
## Benefits
|
||||
|
||||
✅ **Autonomous operation** - Agent can schedule its own tasks
|
||||
✅ **Event-driven** - React to market data, time, or custom events
|
||||
✅ **Flexible scheduling** - Interval or cron-based
|
||||
✅ **Self-managing** - Agent can list and cancel its own jobs
|
||||
✅ **Priority control** - High-priority tasks jump the queue
|
||||
✅ **Future-proof** - Easy to add Python lambdas, strategy execution, etc.
|
||||
|
||||
## Future Enhancements
|
||||
|
||||
- **Python script execution** - Schedule arbitrary Python code
|
||||
- **Strategy triggers** - Connect to strategy execution system
|
||||
- **Event composition** - AND/OR logic for complex event patterns
|
||||
- **Conditional execution** - Only run if conditions met (e.g., volatility > threshold)
|
||||
- **Result chaining** - Use output of one trigger as input to another
|
||||
- **Backtesting mode** - Test trigger logic on historical data
|
||||
|
||||
## Setup in main.py
|
||||
|
||||
```python
|
||||
from agent.tools import set_trigger_queue, set_trigger_scheduler, set_coordinator
|
||||
from trigger import TriggerQueue, CommitCoordinator
|
||||
from trigger.scheduler import TriggerScheduler
|
||||
|
||||
# Initialize trigger system
|
||||
coordinator = CommitCoordinator()
|
||||
queue = TriggerQueue(coordinator)
|
||||
scheduler = TriggerScheduler(queue)
|
||||
|
||||
await queue.start()
|
||||
scheduler.start()
|
||||
|
||||
# Make available to agent tools
|
||||
set_trigger_queue(queue)
|
||||
set_trigger_scheduler(scheduler)
|
||||
set_coordinator(coordinator)
|
||||
|
||||
# Add TRIGGER_TOOLS to agent's tool list
|
||||
from agent.tools import TRIGGER_TOOLS
|
||||
agent_tools = [..., *TRIGGER_TOOLS]
|
||||
```
|
||||
|
||||
Now the agent has full control over the trigger system! 🚀
|
||||
@@ -6,6 +6,7 @@ This package provides tools for:
|
||||
- Chart data access and analysis (chart_tools)
|
||||
- Technical indicators (indicator_tools)
|
||||
- Shape/drawing management (shape_tools)
|
||||
- Trigger system and automation (trigger_tools)
|
||||
"""
|
||||
|
||||
# Global registries that will be set by main.py
|
||||
@@ -39,15 +40,25 @@ from .chart_tools import CHART_TOOLS
|
||||
from .indicator_tools import INDICATOR_TOOLS
|
||||
from .research_tools import RESEARCH_TOOLS
|
||||
from .shape_tools import SHAPE_TOOLS
|
||||
from .trigger_tools import (
|
||||
TRIGGER_TOOLS,
|
||||
set_trigger_queue,
|
||||
set_trigger_scheduler,
|
||||
set_coordinator,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"set_registry",
|
||||
"set_datasource_registry",
|
||||
"set_indicator_registry",
|
||||
"set_trigger_queue",
|
||||
"set_trigger_scheduler",
|
||||
"set_coordinator",
|
||||
"SYNC_TOOLS",
|
||||
"DATASOURCE_TOOLS",
|
||||
"CHART_TOOLS",
|
||||
"INDICATOR_TOOLS",
|
||||
"RESEARCH_TOOLS",
|
||||
"SHAPE_TOOLS",
|
||||
"TRIGGER_TOOLS",
|
||||
]
|
||||
|
||||
366
backend/src/agent/tools/trigger_tools.py
Normal file
366
backend/src/agent/tools/trigger_tools.py
Normal file
@@ -0,0 +1,366 @@
|
||||
"""
|
||||
Agent tools for trigger system.
|
||||
|
||||
Allows agents to:
|
||||
- Schedule recurring tasks (cron-style)
|
||||
- Execute one-time triggers
|
||||
- Manage scheduled triggers (list, cancel)
|
||||
- Connect events to sub-agent runs or lambdas
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from langchain_core.tools import tool
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Global references set by main.py
|
||||
_trigger_queue = None
|
||||
_trigger_scheduler = None
|
||||
_coordinator = None
|
||||
|
||||
|
||||
def set_trigger_queue(queue):
|
||||
"""Set the global TriggerQueue instance for tools to use."""
|
||||
global _trigger_queue
|
||||
_trigger_queue = queue
|
||||
|
||||
|
||||
def set_trigger_scheduler(scheduler):
|
||||
"""Set the global TriggerScheduler instance for tools to use."""
|
||||
global _trigger_scheduler
|
||||
_trigger_scheduler = scheduler
|
||||
|
||||
|
||||
def set_coordinator(coordinator):
|
||||
"""Set the global CommitCoordinator instance for tools to use."""
|
||||
global _coordinator
|
||||
_coordinator = coordinator
|
||||
|
||||
|
||||
def _get_trigger_queue():
|
||||
"""Get the global trigger queue instance."""
|
||||
if not _trigger_queue:
|
||||
raise ValueError("TriggerQueue not initialized")
|
||||
return _trigger_queue
|
||||
|
||||
|
||||
def _get_trigger_scheduler():
|
||||
"""Get the global trigger scheduler instance."""
|
||||
if not _trigger_scheduler:
|
||||
raise ValueError("TriggerScheduler not initialized")
|
||||
return _trigger_scheduler
|
||||
|
||||
|
||||
def _get_coordinator():
|
||||
"""Get the global coordinator instance."""
|
||||
if not _coordinator:
|
||||
raise ValueError("CommitCoordinator not initialized")
|
||||
return _coordinator
|
||||
|
||||
|
||||
@tool
|
||||
async def schedule_agent_prompt(
|
||||
prompt: str,
|
||||
schedule_type: str,
|
||||
schedule_config: Dict[str, Any],
|
||||
name: Optional[str] = None,
|
||||
) -> Dict[str, str]:
|
||||
"""Schedule an agent to run with a specific prompt on a recurring schedule.
|
||||
|
||||
This allows you to set up automated tasks where the agent runs periodically
|
||||
with a predefined prompt. Useful for:
|
||||
- Daily market analysis reports
|
||||
- Hourly portfolio rebalancing checks
|
||||
- Weekly performance summaries
|
||||
- Monitoring alerts
|
||||
|
||||
Args:
|
||||
prompt: The prompt to send to the agent when triggered
|
||||
schedule_type: Type of schedule - "interval" or "cron"
|
||||
schedule_config: Schedule configuration:
|
||||
For "interval": {"minutes": 5} or {"hours": 1, "minutes": 30}
|
||||
For "cron": {"hour": "9", "minute": "0"} for 9:00 AM daily
|
||||
{"hour": "9", "minute": "0", "day_of_week": "mon-fri"}
|
||||
name: Optional descriptive name for this scheduled task
|
||||
|
||||
Returns:
|
||||
Dictionary with job_id and confirmation message
|
||||
|
||||
Examples:
|
||||
# Run every 5 minutes
|
||||
schedule_agent_prompt(
|
||||
prompt="Check BTC price and alert if > $50k",
|
||||
schedule_type="interval",
|
||||
schedule_config={"minutes": 5}
|
||||
)
|
||||
|
||||
# Run daily at 9 AM
|
||||
schedule_agent_prompt(
|
||||
prompt="Generate daily market summary",
|
||||
schedule_type="cron",
|
||||
schedule_config={"hour": "9", "minute": "0"}
|
||||
)
|
||||
|
||||
# Run hourly on weekdays
|
||||
schedule_agent_prompt(
|
||||
prompt="Monitor portfolio for rebalancing opportunities",
|
||||
schedule_type="cron",
|
||||
schedule_config={"minute": "0", "day_of_week": "mon-fri"}
|
||||
)
|
||||
"""
|
||||
from trigger.handlers import LambdaHandler
|
||||
from trigger import Priority
|
||||
|
||||
scheduler = _get_trigger_scheduler()
|
||||
queue = _get_trigger_queue()
|
||||
|
||||
if not name:
|
||||
name = f"agent_prompt_{hash(prompt) % 10000}"
|
||||
|
||||
# Create a lambda that enqueues an agent trigger with the prompt
|
||||
async def agent_prompt_lambda():
|
||||
from trigger.handlers import AgentTriggerHandler
|
||||
|
||||
# Create agent trigger (will use current session's context)
|
||||
# In production, you'd want to specify which session/user this belongs to
|
||||
trigger = AgentTriggerHandler(
|
||||
session_id="scheduled", # Special session for scheduled tasks
|
||||
message_content=prompt,
|
||||
coordinator=_get_coordinator(),
|
||||
)
|
||||
|
||||
await queue.enqueue(trigger)
|
||||
return [] # No direct commit intents
|
||||
|
||||
# Wrap in lambda handler
|
||||
lambda_trigger = LambdaHandler(
|
||||
name=f"scheduled_{name}",
|
||||
func=agent_prompt_lambda,
|
||||
priority=Priority.TIMER,
|
||||
)
|
||||
|
||||
# Schedule based on type
|
||||
if schedule_type == "interval":
|
||||
job_id = scheduler.schedule_interval(
|
||||
lambda_trigger,
|
||||
seconds=schedule_config.get("seconds"),
|
||||
minutes=schedule_config.get("minutes"),
|
||||
hours=schedule_config.get("hours"),
|
||||
priority=Priority.TIMER,
|
||||
)
|
||||
elif schedule_type == "cron":
|
||||
job_id = scheduler.schedule_cron(
|
||||
lambda_trigger,
|
||||
minute=schedule_config.get("minute"),
|
||||
hour=schedule_config.get("hour"),
|
||||
day=schedule_config.get("day"),
|
||||
month=schedule_config.get("month"),
|
||||
day_of_week=schedule_config.get("day_of_week"),
|
||||
priority=Priority.TIMER,
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Invalid schedule_type: {schedule_type}. Use 'interval' or 'cron'")
|
||||
|
||||
return {
|
||||
"job_id": job_id,
|
||||
"message": f"Scheduled '{name}' with job_id={job_id}",
|
||||
"schedule_type": schedule_type,
|
||||
"config": schedule_config,
|
||||
}
|
||||
|
||||
|
||||
@tool
|
||||
async def execute_agent_prompt_once(
|
||||
prompt: str,
|
||||
priority: str = "normal",
|
||||
) -> Dict[str, str]:
|
||||
"""Execute an agent prompt once, immediately (enqueued with priority).
|
||||
|
||||
Use this to trigger a sub-agent with a specific task without waiting for
|
||||
a user message. Useful for:
|
||||
- Background analysis tasks
|
||||
- One-time data processing
|
||||
- Responding to specific events
|
||||
|
||||
Args:
|
||||
prompt: The prompt to send to the agent
|
||||
priority: Priority level - "high", "normal", or "low"
|
||||
|
||||
Returns:
|
||||
Confirmation that the prompt was enqueued
|
||||
|
||||
Example:
|
||||
execute_agent_prompt_once(
|
||||
prompt="Analyze the last 100 BTC/USDT bars and identify support levels",
|
||||
priority="high"
|
||||
)
|
||||
"""
|
||||
from trigger.handlers import AgentTriggerHandler
|
||||
from trigger import Priority
|
||||
|
||||
queue = _get_trigger_queue()
|
||||
|
||||
# Map string priority to enum
|
||||
priority_map = {
|
||||
"high": Priority.USER_AGENT, # Same priority as user messages
|
||||
"normal": Priority.SYSTEM,
|
||||
"low": Priority.LOW,
|
||||
}
|
||||
priority_enum = priority_map.get(priority.lower(), Priority.SYSTEM)
|
||||
|
||||
# Create agent trigger
|
||||
trigger = AgentTriggerHandler(
|
||||
session_id="oneshot",
|
||||
message_content=prompt,
|
||||
coordinator=_get_coordinator(),
|
||||
)
|
||||
|
||||
# Enqueue with priority override
|
||||
queue_seq = await queue.enqueue(trigger, priority_enum)
|
||||
|
||||
return {
|
||||
"queue_seq": queue_seq,
|
||||
"message": f"Enqueued agent prompt with priority={priority}",
|
||||
"prompt": prompt[:100] + "..." if len(prompt) > 100 else prompt,
|
||||
}
|
||||
|
||||
|
||||
@tool
|
||||
def list_scheduled_triggers() -> List[Dict[str, Any]]:
|
||||
"""List all currently scheduled triggers.
|
||||
|
||||
Returns:
|
||||
List of dictionaries with job information (id, name, next_run_time)
|
||||
|
||||
Example:
|
||||
jobs = list_scheduled_triggers()
|
||||
for job in jobs:
|
||||
print(f"{job['id']}: {job['name']} - next run at {job['next_run_time']}")
|
||||
"""
|
||||
scheduler = _get_trigger_scheduler()
|
||||
jobs = scheduler.get_jobs()
|
||||
|
||||
result = []
|
||||
for job in jobs:
|
||||
result.append({
|
||||
"id": job.id,
|
||||
"name": job.name,
|
||||
"next_run_time": str(job.next_run_time) if job.next_run_time else None,
|
||||
"trigger": str(job.trigger),
|
||||
})
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@tool
|
||||
def cancel_scheduled_trigger(job_id: str) -> Dict[str, str]:
|
||||
"""Cancel a scheduled trigger by its job ID.
|
||||
|
||||
Args:
|
||||
job_id: The job ID returned from schedule_agent_prompt or list_scheduled_triggers
|
||||
|
||||
Returns:
|
||||
Confirmation message
|
||||
|
||||
Example:
|
||||
cancel_scheduled_trigger("interval_123")
|
||||
"""
|
||||
scheduler = _get_trigger_scheduler()
|
||||
success = scheduler.remove_job(job_id)
|
||||
|
||||
if success:
|
||||
return {
|
||||
"status": "success",
|
||||
"message": f"Cancelled job {job_id}",
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": f"Job {job_id} not found",
|
||||
}
|
||||
|
||||
|
||||
@tool
|
||||
async def on_data_update_run_agent(
|
||||
source_name: str,
|
||||
symbol: str,
|
||||
resolution: str,
|
||||
prompt_template: str,
|
||||
) -> Dict[str, str]:
|
||||
"""Set up an agent to run whenever new data arrives for a specific symbol.
|
||||
|
||||
The prompt_template can include {variables} that will be filled with bar data:
|
||||
- {time}: Bar timestamp
|
||||
- {open}, {high}, {low}, {close}, {volume}: OHLCV values
|
||||
- {symbol}: Trading pair symbol
|
||||
- {source}: Data source name
|
||||
|
||||
Args:
|
||||
source_name: Name of data source (e.g., "binance")
|
||||
symbol: Trading pair (e.g., "BTC/USDT")
|
||||
resolution: Time resolution (e.g., "1m", "5m", "1h")
|
||||
prompt_template: Template string for agent prompt
|
||||
|
||||
Returns:
|
||||
Confirmation with subscription details
|
||||
|
||||
Example:
|
||||
on_data_update_run_agent(
|
||||
source_name="binance",
|
||||
symbol="BTC/USDT",
|
||||
resolution="1m",
|
||||
prompt_template="New bar on {symbol}: close={close}. Check if we should trade."
|
||||
)
|
||||
|
||||
Note:
|
||||
This is a simplified version. Full implementation would wire into
|
||||
DataSource subscription system to trigger on every bar update.
|
||||
"""
|
||||
# TODO: Implement proper DataSource subscription integration
|
||||
# For now, return placeholder
|
||||
|
||||
return {
|
||||
"status": "not_implemented",
|
||||
"message": "Data-driven agent triggers coming soon",
|
||||
"config": {
|
||||
"source": source_name,
|
||||
"symbol": symbol,
|
||||
"resolution": resolution,
|
||||
"prompt_template": prompt_template,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@tool
|
||||
def get_trigger_system_stats() -> Dict[str, Any]:
|
||||
"""Get statistics about the trigger system.
|
||||
|
||||
Returns:
|
||||
Dictionary with queue depth, execution stats, etc.
|
||||
|
||||
Example:
|
||||
stats = get_trigger_system_stats()
|
||||
print(f"Queue depth: {stats['queue_depth']}")
|
||||
print(f"Current seq: {stats['current_seq']}")
|
||||
"""
|
||||
queue = _get_trigger_queue()
|
||||
coordinator = _get_coordinator()
|
||||
|
||||
return {
|
||||
"queue_depth": queue.get_queue_size(),
|
||||
"queue_running": queue.is_running(),
|
||||
"coordinator_stats": coordinator.get_stats(),
|
||||
}
|
||||
|
||||
|
||||
# Export tools list
|
||||
TRIGGER_TOOLS = [
|
||||
schedule_agent_prompt,
|
||||
execute_agent_prompt_once,
|
||||
list_scheduled_triggers,
|
||||
cancel_scheduled_trigger,
|
||||
on_data_update_run_agent,
|
||||
get_trigger_system_stats,
|
||||
]
|
||||
@@ -21,6 +21,7 @@ from gateway.channels.websocket import WebSocketChannel
|
||||
from gateway.protocol import WebSocketAgentUserMessage
|
||||
from agent.core import create_agent
|
||||
from agent.tools import set_registry, set_datasource_registry, set_indicator_registry
|
||||
from agent.tools import set_trigger_queue, set_trigger_scheduler, set_coordinator
|
||||
from schema.order_spec import SwapOrder
|
||||
from schema.chart_state import ChartState
|
||||
from schema.shape import ShapeCollection
|
||||
@@ -30,6 +31,8 @@ from datasource.subscription_manager import SubscriptionManager
|
||||
from datasource.websocket_handler import DatafeedWebSocketHandler
|
||||
from secrets_manager import SecretsStore, InvalidMasterPassword
|
||||
from indicator import IndicatorRegistry, register_all_talib_indicators, register_custom_indicators
|
||||
from trigger import CommitCoordinator, TriggerQueue
|
||||
from trigger.scheduler import TriggerScheduler
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
@@ -59,6 +62,11 @@ subscription_manager = SubscriptionManager()
|
||||
# Indicator infrastructure
|
||||
indicator_registry = IndicatorRegistry()
|
||||
|
||||
# Trigger system infrastructure
|
||||
trigger_coordinator = None
|
||||
trigger_queue = None
|
||||
trigger_scheduler = None
|
||||
|
||||
# Global secrets store
|
||||
secrets_store = SecretsStore()
|
||||
|
||||
@@ -66,7 +74,7 @@ secrets_store = SecretsStore()
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
"""Initialize agent system and data sources on startup."""
|
||||
global agent_executor
|
||||
global agent_executor, trigger_coordinator, trigger_queue, trigger_scheduler
|
||||
|
||||
# Initialize CCXT data sources
|
||||
try:
|
||||
@@ -115,6 +123,22 @@ async def lifespan(app: FastAPI):
|
||||
if anthropic_api_key:
|
||||
logger.info("Loaded API key from environment")
|
||||
|
||||
# Initialize trigger system
|
||||
logger.info("Initializing trigger system...")
|
||||
trigger_coordinator = CommitCoordinator()
|
||||
trigger_queue = TriggerQueue(trigger_coordinator)
|
||||
trigger_scheduler = TriggerScheduler(trigger_queue)
|
||||
|
||||
# Start trigger queue and scheduler
|
||||
await trigger_queue.start()
|
||||
trigger_scheduler.start()
|
||||
logger.info("Trigger system initialized and started")
|
||||
|
||||
# Set trigger system for agent tools
|
||||
set_coordinator(trigger_coordinator)
|
||||
set_trigger_queue(trigger_queue)
|
||||
set_trigger_scheduler(trigger_scheduler)
|
||||
|
||||
if not anthropic_api_key:
|
||||
logger.error("ANTHROPIC_API_KEY not found in environment!")
|
||||
logger.info("Agent system will not be available")
|
||||
@@ -133,7 +157,7 @@ async def lifespan(app: FastAPI):
|
||||
chroma_db_path=config["memory"]["chroma_db"],
|
||||
embedding_model=config["memory"]["embedding_model"],
|
||||
context_docs_dir=config["agent"]["context_docs_dir"],
|
||||
base_dir="." # backend/src is the working directory, so . goes to backend, where memory/ lives
|
||||
base_dir="." # backend/src is the working directory, so . goes to backend, where memory/ and soul/ live
|
||||
)
|
||||
|
||||
await agent_executor.initialize()
|
||||
@@ -146,9 +170,22 @@ async def lifespan(app: FastAPI):
|
||||
yield
|
||||
|
||||
# Cleanup
|
||||
logger.info("Shutting down systems...")
|
||||
|
||||
# Shutdown trigger system
|
||||
if trigger_scheduler:
|
||||
trigger_scheduler.shutdown(wait=True)
|
||||
logger.info("Trigger scheduler shut down")
|
||||
|
||||
if trigger_queue:
|
||||
await trigger_queue.stop()
|
||||
logger.info("Trigger queue stopped")
|
||||
|
||||
# Shutdown agent system
|
||||
if agent_executor and agent_executor.memory_manager:
|
||||
await agent_executor.memory_manager.close()
|
||||
logger.info("Agent system shut down")
|
||||
|
||||
logger.info("All systems shut down")
|
||||
|
||||
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
|
||||
216
backend/src/trigger/PRIORITIES.md
Normal file
216
backend/src/trigger/PRIORITIES.md
Normal file
@@ -0,0 +1,216 @@
|
||||
# Priority System
|
||||
|
||||
Simple tuple-based priorities for deterministic execution ordering.
|
||||
|
||||
## Basic Concept
|
||||
|
||||
Priorities are just **Python tuples**. Python compares tuples element-by-element, left-to-right:
|
||||
|
||||
```python
|
||||
(0, 1000, 5) < (0, 1001, 3) # True: 0==0, but 1000 < 1001
|
||||
(0, 1000, 5) < (1, 500, 2) # True: 0 < 1
|
||||
(0, 1000) < (0, 1000, 5) # True: shorter wins if equal so far
|
||||
```
|
||||
|
||||
**Lower values = higher priority** (processed first).
|
||||
|
||||
## Priority Categories
|
||||
|
||||
```python
|
||||
class Priority(IntEnum):
|
||||
DATA_SOURCE = 0 # Market data, real-time feeds
|
||||
TIMER = 1 # Scheduled tasks, cron jobs
|
||||
USER_AGENT = 2 # User-agent interactions (chat)
|
||||
USER_DATA_REQUEST = 3 # User data requests (charts)
|
||||
SYSTEM = 4 # Background tasks, cleanup
|
||||
LOW = 5 # Retries after conflicts
|
||||
```
|
||||
|
||||
## Usage Examples
|
||||
|
||||
### Simple Priority
|
||||
|
||||
```python
|
||||
# Just use the Priority enum
|
||||
trigger = MyTrigger("task", priority=Priority.SYSTEM)
|
||||
await queue.enqueue(trigger)
|
||||
|
||||
# Results in tuple: (4, queue_seq)
|
||||
```
|
||||
|
||||
### Compound Priority (Tuple)
|
||||
|
||||
```python
|
||||
# DataSource: sort by event time (older bars first)
|
||||
trigger = DataUpdateTrigger(
|
||||
source_name="binance",
|
||||
symbol="BTC/USDT",
|
||||
resolution="1m",
|
||||
bar_data={"time": 1678896000, "open": 50000, ...}
|
||||
)
|
||||
await queue.enqueue(trigger)
|
||||
|
||||
# Results in tuple: (0, 1678896000, queue_seq)
|
||||
# ^ ^ ^
|
||||
# | | Queue insertion order (FIFO)
|
||||
# | Event time (candle end time)
|
||||
# DATA_SOURCE priority
|
||||
```
|
||||
|
||||
### Manual Override
|
||||
|
||||
```python
|
||||
# Override at enqueue time
|
||||
await queue.enqueue(
|
||||
trigger,
|
||||
priority_override=(Priority.DATA_SOURCE, custom_time, custom_sort)
|
||||
)
|
||||
|
||||
# Queue appends queue_seq: (0, custom_time, custom_sort, queue_seq)
|
||||
```
|
||||
|
||||
## Common Patterns
|
||||
|
||||
### Market Data (Process Chronologically)
|
||||
|
||||
```python
|
||||
# Bar from 10:00 → (0, 10:00_timestamp, queue_seq)
|
||||
# Bar from 10:05 → (0, 10:05_timestamp, queue_seq)
|
||||
#
|
||||
# 10:00 bar processes first (earlier event_time)
|
||||
|
||||
DataUpdateTrigger(
|
||||
...,
|
||||
bar_data={"time": event_timestamp, ...}
|
||||
)
|
||||
```
|
||||
|
||||
### User Messages (FIFO Order)
|
||||
|
||||
```python
|
||||
# Message #1 → (2, msg1_timestamp, queue_seq)
|
||||
# Message #2 → (2, msg2_timestamp, queue_seq)
|
||||
#
|
||||
# Message #1 processes first (earlier timestamp)
|
||||
|
||||
AgentTriggerHandler(
|
||||
session_id="user1",
|
||||
message_content="...",
|
||||
message_timestamp=unix_timestamp # Optional, defaults to now
|
||||
)
|
||||
```
|
||||
|
||||
### Scheduled Tasks (By Schedule Time)
|
||||
|
||||
```python
|
||||
# Job scheduled for 9 AM → (1, 9am_timestamp, queue_seq)
|
||||
# Job scheduled for 2 PM → (1, 2pm_timestamp, queue_seq)
|
||||
#
|
||||
# 9 AM job processes first
|
||||
|
||||
CronTrigger(
|
||||
name="morning_sync",
|
||||
inner_trigger=...,
|
||||
scheduled_time=scheduled_timestamp
|
||||
)
|
||||
```
|
||||
|
||||
## Execution Order Example
|
||||
|
||||
```
|
||||
Queue contains:
|
||||
1. DataSource (BTC @ 10:00) → (0, 10:00, 1)
|
||||
2. DataSource (BTC @ 10:05) → (0, 10:05, 2)
|
||||
3. Timer (scheduled 9 AM) → (1, 09:00, 3)
|
||||
4. User message #1 → (2, 14:30, 4)
|
||||
5. User message #2 → (2, 14:35, 5)
|
||||
|
||||
Dequeue order:
|
||||
1. DataSource (BTC @ 10:00) ← 0 < all others
|
||||
2. DataSource (BTC @ 10:05) ← 0 < all others, 10:05 > 10:00
|
||||
3. Timer (scheduled 9 AM) ← 1 < remaining
|
||||
4. User message #1 ← 2 < remaining, 14:30 < 14:35
|
||||
5. User message #2 ← last
|
||||
```
|
||||
|
||||
## Short Tuple Wins
|
||||
|
||||
If tuples are equal up to the length of the shorter one, **shorter tuple has higher priority**:
|
||||
|
||||
```python
|
||||
(0, 1000) < (0, 1000, 5) # True: shorter wins
|
||||
(0,) < (0, 1000) # True: shorter wins
|
||||
(Priority.DATA_SOURCE,) < (Priority.DATA_SOURCE, 1000) # True
|
||||
```
|
||||
|
||||
This is Python's default tuple comparison behavior. In practice, we always append `queue_seq`, so this rarely matters (all tuples end up same length).
|
||||
|
||||
## Integration with Triggers
|
||||
|
||||
### Trigger Sets Its Own Priority
|
||||
|
||||
```python
|
||||
class MyTrigger(Trigger):
|
||||
def __init__(self, event_time):
|
||||
super().__init__(
|
||||
name="my_trigger",
|
||||
priority=Priority.DATA_SOURCE,
|
||||
priority_tuple=(Priority.DATA_SOURCE.value, event_time)
|
||||
)
|
||||
```
|
||||
|
||||
Queue appends `queue_seq` automatically:
|
||||
```python
|
||||
# Trigger's tuple: (0, event_time)
|
||||
# After enqueue: (0, event_time, queue_seq)
|
||||
```
|
||||
|
||||
### Override at Enqueue
|
||||
|
||||
```python
|
||||
# Ignore trigger's priority, use override
|
||||
await queue.enqueue(
|
||||
trigger,
|
||||
priority_override=(Priority.TIMER, scheduled_time)
|
||||
)
|
||||
```
|
||||
|
||||
## Why Tuples?
|
||||
|
||||
✅ **Simple**: No custom classes, just native Python tuples
|
||||
✅ **Flexible**: Add as many sort keys as needed
|
||||
✅ **Efficient**: Python's tuple comparison is highly optimized
|
||||
✅ **Readable**: `(0, 1000, 5)` is obvious what it means
|
||||
✅ **Debuggable**: Can print and inspect easily
|
||||
|
||||
Example:
|
||||
```python
|
||||
# Old: CompoundPriority(primary=0, secondary=1000, tertiary=5)
|
||||
# New: (0, 1000, 5)
|
||||
|
||||
# Same semantics, much simpler!
|
||||
```
|
||||
|
||||
## Advanced: Custom Sorting
|
||||
|
||||
Want to sort by multiple factors? Just add more elements:
|
||||
|
||||
```python
|
||||
# Sort by: priority → symbol → event_time → queue_seq
|
||||
priority_tuple = (
|
||||
Priority.DATA_SOURCE.value,
|
||||
symbol_id, # e.g., hash("BTC/USDT")
|
||||
event_time,
|
||||
# queue_seq appended by queue
|
||||
)
|
||||
```
|
||||
|
||||
## Summary
|
||||
|
||||
- **Priorities are tuples**: `(primary, secondary, ..., queue_seq)`
|
||||
- **Lower = higher priority**: Processed first
|
||||
- **Element-by-element comparison**: Left-to-right
|
||||
- **Shorter tuple wins**: If equal up to shorter length
|
||||
- **Queue appends queue_seq**: Always last element (FIFO within same priority)
|
||||
|
||||
That's it! No complex classes, just tuples. 🎯
|
||||
386
backend/src/trigger/README.md
Normal file
386
backend/src/trigger/README.md
Normal file
@@ -0,0 +1,386 @@
|
||||
# Trigger System
|
||||
|
||||
Lock-free, sequence-based execution system for deterministic event processing.
|
||||
|
||||
## Overview
|
||||
|
||||
All operations (WebSocket messages, cron tasks, data updates) flow through a **priority queue**, execute in **parallel**, but commit in **strict sequential order** with **optimistic conflict detection**.
|
||||
|
||||
### Key Features
|
||||
|
||||
- **Lock-free reads**: Snapshots are deep copies, no blocking
|
||||
- **Sequential commits**: Total ordering via sequence numbers
|
||||
- **Optimistic concurrency**: Conflicts detected, retry with same seq
|
||||
- **Priority preservation**: High-priority work never blocked by low-priority
|
||||
- **Long-running agents**: Execute in parallel, commit sequentially
|
||||
- **Deterministic replay**: Can reproduce exact system state at any seq
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
┌─────────────┐
|
||||
│ WebSocket │───┐
|
||||
│ Messages │ │
|
||||
└─────────────┘ │
|
||||
├──→ ┌─────────────────┐
|
||||
┌─────────────┐ │ │ TriggerQueue │
|
||||
│ Cron │───┤ │ (Priority Queue)│
|
||||
│ Scheduled │ │ └────────┬────────┘
|
||||
└─────────────┘ │ │ Assign seq
|
||||
│ ↓
|
||||
┌─────────────┐ │ ┌─────────────────┐
|
||||
│ DataSource │───┘ │ Execute Trigger│
|
||||
│ Updates │ │ (Parallel OK) │
|
||||
└─────────────┘ └────────┬────────┘
|
||||
│ CommitIntents
|
||||
↓
|
||||
┌─────────────────┐
|
||||
│ CommitCoordinator│
|
||||
│ (Sequential) │
|
||||
└────────┬────────┘
|
||||
│ Commit in seq order
|
||||
↓
|
||||
┌─────────────────┐
|
||||
│ VersionedStores │
|
||||
│ (w/ Backends) │
|
||||
└─────────────────┘
|
||||
```
|
||||
|
||||
## Core Components
|
||||
|
||||
### 1. ExecutionContext (`context.py`)
|
||||
|
||||
Tracks execution seq and store snapshots via `contextvars` (auto-propagates through async calls).
|
||||
|
||||
```python
|
||||
from trigger import get_execution_context
|
||||
|
||||
ctx = get_execution_context()
|
||||
print(f"Running at seq {ctx.seq}")
|
||||
```
|
||||
|
||||
### 2. Trigger Types (`types.py`)
|
||||
|
||||
```python
|
||||
from trigger import Trigger, Priority, CommitIntent
|
||||
|
||||
class MyTrigger(Trigger):
|
||||
async def execute(self) -> list[CommitIntent]:
|
||||
# Read snapshot
|
||||
seq, data = some_store.read_snapshot()
|
||||
|
||||
# Modify
|
||||
new_data = modify(data)
|
||||
|
||||
# Prepare commit
|
||||
intent = some_store.prepare_commit(seq, new_data)
|
||||
return [intent]
|
||||
```
|
||||
|
||||
### 3. VersionedStore (`store.py`)
|
||||
|
||||
Stores with pluggable backends and optimistic concurrency:
|
||||
|
||||
```python
|
||||
from trigger import VersionedStore, PydanticStoreBackend
|
||||
|
||||
# Wrap existing Pydantic model
|
||||
backend = PydanticStoreBackend(order_store)
|
||||
versioned_store = VersionedStore("OrderStore", backend)
|
||||
|
||||
# Lock-free snapshot read
|
||||
seq, snapshot = versioned_store.read_snapshot()
|
||||
|
||||
# Prepare commit (does not modify yet)
|
||||
intent = versioned_store.prepare_commit(seq, modified_snapshot)
|
||||
```
|
||||
|
||||
**Pluggable Backends**:
|
||||
- `PydanticStoreBackend`: For existing Pydantic models (OrderStore, ChartStore, etc.)
|
||||
- `FileStoreBackend`: Future - version files (Python scripts, configs)
|
||||
- `DatabaseStoreBackend`: Future - version database rows
|
||||
|
||||
### 4. CommitCoordinator (`coordinator.py`)
|
||||
|
||||
Manages sequential commits with conflict detection:
|
||||
|
||||
- Waits for seq N to commit before N+1
|
||||
- Detects conflicts (expected_seq vs committed_seq)
|
||||
- Re-executes (not re-enqueues) on conflict **with same seq**
|
||||
- Tracks execution state for debugging
|
||||
|
||||
### 5. TriggerQueue (`queue.py`)
|
||||
|
||||
Priority queue with seq assignment:
|
||||
|
||||
```python
|
||||
from trigger import TriggerQueue
|
||||
|
||||
queue = TriggerQueue(coordinator)
|
||||
await queue.start()
|
||||
|
||||
# Enqueue trigger
|
||||
await queue.enqueue(my_trigger, Priority.HIGH)
|
||||
```
|
||||
|
||||
### 6. TriggerScheduler (`scheduler.py`)
|
||||
|
||||
APScheduler integration for cron triggers:
|
||||
|
||||
```python
|
||||
from trigger.scheduler import TriggerScheduler
|
||||
|
||||
scheduler = TriggerScheduler(queue)
|
||||
scheduler.start()
|
||||
|
||||
# Every 5 minutes
|
||||
scheduler.schedule_interval(
|
||||
IndicatorUpdateTrigger("rsi_14"),
|
||||
minutes=5
|
||||
)
|
||||
|
||||
# Daily at 9 AM
|
||||
scheduler.schedule_cron(
|
||||
SyncExchangeStateTrigger(),
|
||||
hour="9",
|
||||
minute="0"
|
||||
)
|
||||
```
|
||||
|
||||
## Integration Example
|
||||
|
||||
### Basic Setup in `main.py`
|
||||
|
||||
```python
|
||||
from trigger import (
|
||||
CommitCoordinator,
|
||||
TriggerQueue,
|
||||
VersionedStore,
|
||||
PydanticStoreBackend,
|
||||
)
|
||||
from trigger.scheduler import TriggerScheduler
|
||||
|
||||
# Create coordinator
|
||||
coordinator = CommitCoordinator()
|
||||
|
||||
# Wrap existing stores
|
||||
order_store_versioned = VersionedStore(
|
||||
"OrderStore",
|
||||
PydanticStoreBackend(order_store)
|
||||
)
|
||||
coordinator.register_store(order_store_versioned)
|
||||
|
||||
chart_store_versioned = VersionedStore(
|
||||
"ChartStore",
|
||||
PydanticStoreBackend(chart_store)
|
||||
)
|
||||
coordinator.register_store(chart_store_versioned)
|
||||
|
||||
# Create queue and scheduler
|
||||
trigger_queue = TriggerQueue(coordinator)
|
||||
await trigger_queue.start()
|
||||
|
||||
scheduler = TriggerScheduler(trigger_queue)
|
||||
scheduler.start()
|
||||
```
|
||||
|
||||
### WebSocket Message Handler
|
||||
|
||||
```python
|
||||
from trigger.handlers import AgentTriggerHandler
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
|
||||
while True:
|
||||
data = await websocket.receive_json()
|
||||
|
||||
if data["type"] == "agent_user_message":
|
||||
# Enqueue agent trigger instead of direct Gateway call
|
||||
trigger = AgentTriggerHandler(
|
||||
session_id=data["session_id"],
|
||||
message_content=data["content"],
|
||||
gateway_handler=gateway.route_user_message,
|
||||
coordinator=coordinator,
|
||||
)
|
||||
await trigger_queue.enqueue(trigger)
|
||||
```
|
||||
|
||||
### DataSource Updates
|
||||
|
||||
```python
|
||||
from trigger.handlers import DataUpdateTrigger
|
||||
|
||||
# In subscription_manager._on_source_update()
|
||||
def _on_source_update(self, source_key: tuple, bar: dict):
|
||||
# Enqueue data update trigger
|
||||
trigger = DataUpdateTrigger(
|
||||
source_name=source_key[0],
|
||||
symbol=source_key[1],
|
||||
resolution=source_key[2],
|
||||
bar_data=bar,
|
||||
coordinator=coordinator,
|
||||
)
|
||||
asyncio.create_task(trigger_queue.enqueue(trigger))
|
||||
```
|
||||
|
||||
### Custom Trigger
|
||||
|
||||
```python
|
||||
from trigger import Trigger, CommitIntent, Priority
|
||||
|
||||
class RecalculatePortfolioTrigger(Trigger):
|
||||
def __init__(self, coordinator):
|
||||
super().__init__("recalc_portfolio", Priority.NORMAL)
|
||||
self.coordinator = coordinator
|
||||
|
||||
async def execute(self) -> list[CommitIntent]:
|
||||
# Read snapshots from multiple stores
|
||||
order_seq, orders = self.coordinator.get_store("OrderStore").read_snapshot()
|
||||
chart_seq, chart = self.coordinator.get_store("ChartStore").read_snapshot()
|
||||
|
||||
# Calculate portfolio value
|
||||
portfolio_value = calculate_portfolio(orders, chart)
|
||||
|
||||
# Update chart state with portfolio value
|
||||
chart.portfolio_value = portfolio_value
|
||||
|
||||
# Prepare commit
|
||||
intent = self.coordinator.get_store("ChartStore").prepare_commit(
|
||||
chart_seq,
|
||||
chart
|
||||
)
|
||||
|
||||
return [intent]
|
||||
|
||||
# Schedule it
|
||||
scheduler.schedule_interval(
|
||||
RecalculatePortfolioTrigger(coordinator),
|
||||
minutes=1
|
||||
)
|
||||
```
|
||||
|
||||
## Execution Flow
|
||||
|
||||
### Normal Flow (No Conflicts)
|
||||
|
||||
```
|
||||
seq=100: WebSocket message arrives → enqueue → dequeue → assign seq=100 → execute
|
||||
seq=101: Cron trigger fires → enqueue → dequeue → assign seq=101 → execute
|
||||
|
||||
seq=101 finishes first → waits in commit queue
|
||||
seq=100 finishes → commits immediately (next in order)
|
||||
seq=101 commits next
|
||||
```
|
||||
|
||||
### Conflict Flow
|
||||
|
||||
```
|
||||
seq=100: reads OrderStore at seq=99 → executes for 30 seconds
|
||||
seq=101: reads OrderStore at seq=99 → executes for 5 seconds
|
||||
|
||||
seq=101 finishes first → tries to commit based on seq=99
|
||||
seq=100 finishes → commits OrderStore at seq=100
|
||||
|
||||
Coordinator detects conflict:
|
||||
expected_seq=99, committed_seq=100
|
||||
|
||||
seq=101 evicted → RE-EXECUTES with same seq=101 (not re-enqueued)
|
||||
reads OrderStore at seq=100 → executes again
|
||||
finishes → commits successfully at seq=101
|
||||
```
|
||||
|
||||
## Benefits
|
||||
|
||||
### For Agent System
|
||||
|
||||
- **Long-running agents work naturally**: Agent starts at seq=100, runs for 60 seconds while market data updates at seq=101-110, commits only if no conflicts
|
||||
- **No deadlocks**: No locks = no deadlock possibility
|
||||
- **Deterministic**: Can replay from any seq for debugging
|
||||
|
||||
### For Strategy Execution
|
||||
|
||||
- **High-frequency data doesn't block strategies**: Data updates enqueued, executed in parallel, commit sequentially
|
||||
- **Priority preservation**: Critical order execution never blocked by indicator calculations
|
||||
- **Conflict detection**: If market moved during strategy calculation, automatically retry with fresh data
|
||||
|
||||
### For Scaling
|
||||
|
||||
- **Single-node first**: Runs on single asyncio event loop, no complex distributed coordination
|
||||
- **Future-proof**: Can swap queue for Redis/PostgreSQL-backed distributed queue later
|
||||
- **Event sourcing ready**: All commits have seq numbers, can build event log
|
||||
|
||||
## Debugging
|
||||
|
||||
### Check Current State
|
||||
|
||||
```python
|
||||
# Coordinator stats
|
||||
stats = coordinator.get_stats()
|
||||
print(f"Current seq: {stats['current_seq']}")
|
||||
print(f"Pending commits: {stats['pending_commits']}")
|
||||
print(f"Executions by state: {stats['state_counts']}")
|
||||
|
||||
# Store state
|
||||
store = coordinator.get_store("OrderStore")
|
||||
print(f"Store: {store}") # Shows committed_seq and version
|
||||
|
||||
# Execution record
|
||||
record = coordinator.get_execution_record(100)
|
||||
print(f"Seq 100: {record}") # Shows state, retry_count, error
|
||||
```
|
||||
|
||||
### Common Issues
|
||||
|
||||
**Symptoms: High conflict rate**
|
||||
- **Cause**: Multiple triggers modifying same store frequently
|
||||
- **Solution**: Batch updates, use debouncing, or redesign to reduce contention
|
||||
|
||||
**Symptoms: Commits stuck (next_commit_seq not advancing)**
|
||||
- **Cause**: Execution at that seq failed or is taking too long
|
||||
- **Solution**: Check execution_records for that seq, look for errors in logs
|
||||
|
||||
**Symptoms: Queue depth growing**
|
||||
- **Cause**: Executions slower than enqueue rate
|
||||
- **Solution**: Profile trigger execution, optimize slow paths, add rate limiting
|
||||
|
||||
## Testing
|
||||
|
||||
### Unit Test: Conflict Detection
|
||||
|
||||
```python
|
||||
import pytest
|
||||
from trigger import VersionedStore, PydanticStoreBackend, CommitCoordinator
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_conflict_detection():
|
||||
coordinator = CommitCoordinator()
|
||||
|
||||
store = VersionedStore("TestStore", PydanticStoreBackend(TestModel()))
|
||||
coordinator.register_store(store)
|
||||
|
||||
# Seq 1: read at 0, modify, commit
|
||||
seq1, data1 = store.read_snapshot()
|
||||
data1.value = "seq1"
|
||||
intent1 = store.prepare_commit(seq1, data1)
|
||||
|
||||
# Seq 2: read at 0 (same snapshot), modify
|
||||
seq2, data2 = store.read_snapshot()
|
||||
data2.value = "seq2"
|
||||
intent2 = store.prepare_commit(seq2, data2)
|
||||
|
||||
# Commit seq 1 (should succeed)
|
||||
# ... coordinator logic ...
|
||||
|
||||
# Commit seq 2 (should conflict and retry)
|
||||
# ... verify conflict detected ...
|
||||
```
|
||||
|
||||
## Future Enhancements
|
||||
|
||||
- **Distributed queue**: Redis-backed queue for multi-worker deployment
|
||||
- **Event log persistence**: Store all commits for event sourcing/audit
|
||||
- **Metrics dashboard**: Real-time view of queue depth, conflict rate, latency
|
||||
- **Transaction snapshots**: Full system state at any seq for replay/debugging
|
||||
- **Automatic batching**: Coalesce rapid updates to same store
|
||||
35
backend/src/trigger/__init__.py
Normal file
35
backend/src/trigger/__init__.py
Normal file
@@ -0,0 +1,35 @@
|
||||
"""
|
||||
Sequential execution trigger system with optimistic concurrency control.
|
||||
|
||||
All operations (websocket, cron, data events) flow through a priority queue,
|
||||
execute in parallel, but commit in strict sequential order with conflict detection.
|
||||
"""
|
||||
|
||||
from .context import ExecutionContext, get_execution_context
|
||||
from .types import Priority, PriorityTuple, Trigger, CommitIntent, ExecutionState
|
||||
from .store import VersionedStore, StoreBackend, PydanticStoreBackend
|
||||
from .coordinator import CommitCoordinator
|
||||
from .queue import TriggerQueue
|
||||
from .handlers import AgentTriggerHandler, LambdaHandler
|
||||
|
||||
__all__ = [
|
||||
# Context
|
||||
"ExecutionContext",
|
||||
"get_execution_context",
|
||||
# Types
|
||||
"Priority",
|
||||
"PriorityTuple",
|
||||
"Trigger",
|
||||
"CommitIntent",
|
||||
"ExecutionState",
|
||||
# Store
|
||||
"VersionedStore",
|
||||
"StoreBackend",
|
||||
"PydanticStoreBackend",
|
||||
# Coordination
|
||||
"CommitCoordinator",
|
||||
"TriggerQueue",
|
||||
# Handlers
|
||||
"AgentTriggerHandler",
|
||||
"LambdaHandler",
|
||||
]
|
||||
61
backend/src/trigger/context.py
Normal file
61
backend/src/trigger/context.py
Normal file
@@ -0,0 +1,61 @@
|
||||
"""
|
||||
Execution context tracking using Python's contextvars.
|
||||
|
||||
Each execution gets a unique seq number that propagates through all async calls,
|
||||
allowing us to track which execution made which changes for conflict detection.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from contextvars import ContextVar
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Context variables - automatically propagate through async call chains
|
||||
_execution_context: ContextVar[Optional["ExecutionContext"]] = ContextVar(
|
||||
"execution_context", default=None
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ExecutionContext:
|
||||
"""
|
||||
Execution context for a single trigger execution.
|
||||
|
||||
Automatically propagates through async calls via contextvars.
|
||||
Tracks the seq number and which store snapshots were read.
|
||||
"""
|
||||
|
||||
seq: int
|
||||
"""Sequential execution number - determines commit order"""
|
||||
|
||||
trigger_name: str
|
||||
"""Name/type of trigger being executed"""
|
||||
|
||||
snapshot_seqs: dict[str, int] = field(default_factory=dict)
|
||||
"""Store name -> seq number of snapshot that was read"""
|
||||
|
||||
def record_snapshot(self, store_name: str, snapshot_seq: int) -> None:
|
||||
"""Record that we read a snapshot from a store at a specific seq"""
|
||||
self.snapshot_seqs[store_name] = snapshot_seq
|
||||
logger.debug(f"Seq {self.seq}: Read {store_name} at seq {snapshot_seq}")
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"ExecutionContext(seq={self.seq}, trigger={self.trigger_name})"
|
||||
|
||||
|
||||
def get_execution_context() -> Optional[ExecutionContext]:
|
||||
"""Get the current execution context, or None if not in an execution"""
|
||||
return _execution_context.get()
|
||||
|
||||
|
||||
def set_execution_context(ctx: ExecutionContext) -> None:
|
||||
"""Set the execution context for the current async task"""
|
||||
_execution_context.set(ctx)
|
||||
logger.debug(f"Set execution context: {ctx}")
|
||||
|
||||
|
||||
def clear_execution_context() -> None:
|
||||
"""Clear the execution context"""
|
||||
_execution_context.set(None)
|
||||
302
backend/src/trigger/coordinator.py
Normal file
302
backend/src/trigger/coordinator.py
Normal file
@@ -0,0 +1,302 @@
|
||||
"""
|
||||
Commit coordinator - manages sequential commits with conflict detection.
|
||||
|
||||
Ensures that commits happen in strict sequence order, even when executions
|
||||
complete out of order. Detects conflicts and triggers re-execution with the
|
||||
same seq number (not re-enqueue, just re-execute).
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from .context import ExecutionContext
|
||||
from .store import VersionedStore
|
||||
from .types import CommitIntent, ExecutionRecord, ExecutionState, Trigger
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CommitCoordinator:
|
||||
"""
|
||||
Manages sequential commits with optimistic concurrency control.
|
||||
|
||||
Key responsibilities:
|
||||
- Maintain strict sequential commit order (seq N+1 commits after seq N)
|
||||
- Detect conflicts between execution snapshot and committed state
|
||||
- Trigger re-execution (not re-enqueue) on conflicts with same seq
|
||||
- Track in-flight executions for debugging and monitoring
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._stores: dict[str, VersionedStore] = {}
|
||||
self._current_seq = 0 # Highest committed seq across all operations
|
||||
self._next_commit_seq = 1 # Next seq we're waiting to commit
|
||||
self._pending_commits: dict[int, tuple[ExecutionRecord, list[CommitIntent]]] = {}
|
||||
self._execution_records: dict[int, ExecutionRecord] = {}
|
||||
self._lock = asyncio.Lock() # Only for coordinator internal state, not stores
|
||||
|
||||
def register_store(self, store: VersionedStore) -> None:
|
||||
"""Register a versioned store with the coordinator"""
|
||||
self._stores[store.name] = store
|
||||
logger.info(f"Registered store: {store.name}")
|
||||
|
||||
def get_store(self, name: str) -> Optional[VersionedStore]:
|
||||
"""Get a registered store by name"""
|
||||
return self._stores.get(name)
|
||||
|
||||
async def start_execution(self, seq: int, trigger: Trigger) -> ExecutionRecord:
|
||||
"""
|
||||
Record that an execution is starting.
|
||||
|
||||
Args:
|
||||
seq: Sequence number assigned to this execution
|
||||
trigger: The trigger being executed
|
||||
|
||||
Returns:
|
||||
ExecutionRecord for tracking
|
||||
"""
|
||||
async with self._lock:
|
||||
record = ExecutionRecord(
|
||||
seq=seq,
|
||||
trigger=trigger,
|
||||
state=ExecutionState.EXECUTING,
|
||||
)
|
||||
self._execution_records[seq] = record
|
||||
logger.info(f"Started execution: seq={seq}, trigger={trigger.name}")
|
||||
return record
|
||||
|
||||
async def submit_for_commit(
|
||||
self,
|
||||
seq: int,
|
||||
commit_intents: list[CommitIntent],
|
||||
) -> None:
|
||||
"""
|
||||
Submit commit intents for sequential commit.
|
||||
|
||||
The commit will only happen when:
|
||||
1. All prior seq numbers have committed
|
||||
2. No conflicts detected with committed state
|
||||
|
||||
Args:
|
||||
seq: Sequence number of this execution
|
||||
commit_intents: List of changes to commit (empty if no changes)
|
||||
"""
|
||||
async with self._lock:
|
||||
record = self._execution_records.get(seq)
|
||||
if not record:
|
||||
logger.error(f"No execution record found for seq={seq}")
|
||||
return
|
||||
|
||||
record.state = ExecutionState.WAITING_COMMIT
|
||||
record.commit_intents = commit_intents
|
||||
self._pending_commits[seq] = (record, commit_intents)
|
||||
|
||||
logger.info(
|
||||
f"Seq {seq} submitted for commit with {len(commit_intents)} intents"
|
||||
)
|
||||
|
||||
# Try to process commits (this will handle sequential ordering)
|
||||
await self._process_commits()
|
||||
|
||||
async def _process_commits(self) -> None:
|
||||
"""
|
||||
Process pending commits in strict sequential order.
|
||||
|
||||
Only commits seq N if seq N-1 has already committed.
|
||||
Detects conflicts and triggers re-execution with same seq.
|
||||
"""
|
||||
while True:
|
||||
async with self._lock:
|
||||
# Check if next expected seq is ready to commit
|
||||
if self._next_commit_seq not in self._pending_commits:
|
||||
# Waiting for this seq to complete execution
|
||||
break
|
||||
|
||||
seq = self._next_commit_seq
|
||||
record, intents = self._pending_commits[seq]
|
||||
|
||||
logger.info(
|
||||
f"Processing commit for seq={seq} (current_seq={self._current_seq})"
|
||||
)
|
||||
|
||||
# Check for conflicts
|
||||
conflicts = self._check_conflicts(intents)
|
||||
|
||||
if conflicts:
|
||||
# Conflict detected - re-execute with same seq
|
||||
logger.warning(
|
||||
f"Seq {seq} has conflicts in stores: {conflicts}. Re-executing..."
|
||||
)
|
||||
|
||||
# Remove from pending (will be re-added when execution completes)
|
||||
del self._pending_commits[seq]
|
||||
|
||||
# Mark as evicted
|
||||
record.state = ExecutionState.EVICTED
|
||||
record.retry_count += 1
|
||||
|
||||
# Advance to next seq (this seq will be retried in background)
|
||||
self._next_commit_seq += 1
|
||||
self._current_seq += 1
|
||||
|
||||
# Trigger re-execution (outside lock)
|
||||
asyncio.create_task(self._retry_execution(record))
|
||||
|
||||
continue
|
||||
|
||||
# No conflicts - commit all intents atomically
|
||||
for intent in intents:
|
||||
store = self._stores.get(intent.store_name)
|
||||
if not store:
|
||||
logger.error(
|
||||
f"Seq {seq}: Store '{intent.store_name}' not found"
|
||||
)
|
||||
continue
|
||||
|
||||
store.commit(intent.new_data, seq)
|
||||
|
||||
# Mark as committed
|
||||
record.state = ExecutionState.COMMITTED
|
||||
del self._pending_commits[seq]
|
||||
|
||||
# Advance seq counters
|
||||
self._current_seq = seq
|
||||
self._next_commit_seq = seq + 1
|
||||
|
||||
logger.info(
|
||||
f"Committed seq={seq}, current_seq now {self._current_seq}"
|
||||
)
|
||||
|
||||
def _check_conflicts(self, intents: list[CommitIntent]) -> list[str]:
|
||||
"""
|
||||
Check if any commit intents conflict with current committed state.
|
||||
|
||||
Args:
|
||||
intents: List of commit intents to check
|
||||
|
||||
Returns:
|
||||
List of store names that have conflicts (empty if no conflicts)
|
||||
"""
|
||||
conflicts = []
|
||||
|
||||
for intent in intents:
|
||||
store = self._stores.get(intent.store_name)
|
||||
if not store:
|
||||
logger.error(f"Store '{intent.store_name}' not found during conflict check")
|
||||
continue
|
||||
|
||||
if store.check_conflict(intent.expected_seq):
|
||||
conflicts.append(intent.store_name)
|
||||
|
||||
return conflicts
|
||||
|
||||
async def _retry_execution(self, record: ExecutionRecord) -> None:
|
||||
"""
|
||||
Re-execute a trigger that had conflicts.
|
||||
|
||||
Executes with the SAME seq number (not re-enqueued, just re-executed).
|
||||
This ensures the execution order remains deterministic.
|
||||
|
||||
Args:
|
||||
record: Execution record to retry
|
||||
"""
|
||||
from .context import ExecutionContext, set_execution_context, clear_execution_context
|
||||
|
||||
logger.info(
|
||||
f"Retrying execution: seq={record.seq}, trigger={record.trigger.name}, "
|
||||
f"retry_count={record.retry_count}"
|
||||
)
|
||||
|
||||
# Set execution context for retry
|
||||
ctx = ExecutionContext(
|
||||
seq=record.seq,
|
||||
trigger_name=record.trigger.name,
|
||||
)
|
||||
set_execution_context(ctx)
|
||||
|
||||
try:
|
||||
# Re-execute trigger
|
||||
record.state = ExecutionState.EXECUTING
|
||||
commit_intents = await record.trigger.execute()
|
||||
|
||||
# Submit for commit again (with same seq)
|
||||
await self.submit_for_commit(record.seq, commit_intents)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Retry execution failed for seq={record.seq}: {e}", exc_info=True
|
||||
)
|
||||
record.state = ExecutionState.FAILED
|
||||
record.error = str(e)
|
||||
|
||||
# Still need to advance past this seq
|
||||
async with self._lock:
|
||||
if record.seq == self._next_commit_seq:
|
||||
self._next_commit_seq += 1
|
||||
self._current_seq += 1
|
||||
|
||||
# Try to process any pending commits
|
||||
await self._process_commits()
|
||||
|
||||
finally:
|
||||
clear_execution_context()
|
||||
|
||||
async def execution_failed(self, seq: int, error: Exception) -> None:
|
||||
"""
|
||||
Mark an execution as failed.
|
||||
|
||||
Args:
|
||||
seq: Sequence number that failed
|
||||
error: The exception that caused the failure
|
||||
"""
|
||||
async with self._lock:
|
||||
record = self._execution_records.get(seq)
|
||||
if record:
|
||||
record.state = ExecutionState.FAILED
|
||||
record.error = str(error)
|
||||
|
||||
# Remove from pending if present
|
||||
self._pending_commits.pop(seq, None)
|
||||
|
||||
# If this is the next seq to commit, advance past it
|
||||
if seq == self._next_commit_seq:
|
||||
self._next_commit_seq += 1
|
||||
self._current_seq += 1
|
||||
|
||||
logger.info(
|
||||
f"Seq {seq} failed, advancing current_seq to {self._current_seq}"
|
||||
)
|
||||
|
||||
# Try to process any pending commits
|
||||
await self._process_commits()
|
||||
|
||||
def get_current_seq(self) -> int:
|
||||
"""Get the current committed sequence number"""
|
||||
return self._current_seq
|
||||
|
||||
def get_execution_record(self, seq: int) -> Optional[ExecutionRecord]:
|
||||
"""Get execution record for a specific seq"""
|
||||
return self._execution_records.get(seq)
|
||||
|
||||
def get_stats(self) -> dict:
|
||||
"""Get statistics about the coordinator state"""
|
||||
state_counts = {}
|
||||
for record in self._execution_records.values():
|
||||
state_name = record.state.name
|
||||
state_counts[state_name] = state_counts.get(state_name, 0) + 1
|
||||
|
||||
return {
|
||||
"current_seq": self._current_seq,
|
||||
"next_commit_seq": self._next_commit_seq,
|
||||
"pending_commits": len(self._pending_commits),
|
||||
"total_executions": len(self._execution_records),
|
||||
"state_counts": state_counts,
|
||||
"stores": {name: str(store) for name, store in self._stores.items()},
|
||||
}
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f"CommitCoordinator(current_seq={self._current_seq}, "
|
||||
f"pending={len(self._pending_commits)}, stores={len(self._stores)})"
|
||||
)
|
||||
304
backend/src/trigger/handlers.py
Normal file
304
backend/src/trigger/handlers.py
Normal file
@@ -0,0 +1,304 @@
|
||||
"""
|
||||
Trigger handlers - concrete implementations for common trigger types.
|
||||
|
||||
Provides ready-to-use trigger handlers for:
|
||||
- Agent execution (WebSocket user messages)
|
||||
- Lambda/callable execution
|
||||
- Data update triggers
|
||||
- Indicator updates
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
from typing import Any, Awaitable, Callable, Optional
|
||||
|
||||
from .coordinator import CommitCoordinator
|
||||
from .types import CommitIntent, Priority, Trigger
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AgentTriggerHandler(Trigger):
|
||||
"""
|
||||
Trigger for agent execution from WebSocket user messages.
|
||||
|
||||
Wraps the Gateway's agent execution flow and captures any
|
||||
store modifications as commit intents.
|
||||
|
||||
Priority tuple: (USER_AGENT, message_timestamp, queue_seq)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
session_id: str,
|
||||
message_content: str,
|
||||
message_timestamp: Optional[int] = None,
|
||||
attachments: Optional[list] = None,
|
||||
gateway_handler: Optional[Callable] = None,
|
||||
coordinator: Optional[CommitCoordinator] = None,
|
||||
):
|
||||
"""
|
||||
Initialize agent trigger.
|
||||
|
||||
Args:
|
||||
session_id: User session ID
|
||||
message_content: User message content
|
||||
message_timestamp: When user sent message (unix timestamp, defaults to now)
|
||||
attachments: Optional message attachments
|
||||
gateway_handler: Callable to route to Gateway (set during integration)
|
||||
coordinator: CommitCoordinator for accessing stores
|
||||
"""
|
||||
if message_timestamp is None:
|
||||
message_timestamp = int(time.time())
|
||||
|
||||
# Priority tuple: sort by USER_AGENT priority, then message timestamp
|
||||
super().__init__(
|
||||
name=f"agent_{session_id}",
|
||||
priority=Priority.USER_AGENT,
|
||||
priority_tuple=(Priority.USER_AGENT.value, message_timestamp)
|
||||
)
|
||||
self.session_id = session_id
|
||||
self.message_content = message_content
|
||||
self.message_timestamp = message_timestamp
|
||||
self.attachments = attachments or []
|
||||
self.gateway_handler = gateway_handler
|
||||
self.coordinator = coordinator
|
||||
|
||||
async def execute(self) -> list[CommitIntent]:
|
||||
"""
|
||||
Execute agent interaction.
|
||||
|
||||
This will call into the Gateway, which will run the agent.
|
||||
The agent may read from stores and generate responses.
|
||||
Any store modifications are captured as commit intents.
|
||||
|
||||
Returns:
|
||||
List of commit intents (typically empty for now, as agent
|
||||
modifies stores via tools which will be integrated later)
|
||||
"""
|
||||
if not self.gateway_handler:
|
||||
logger.error("No gateway_handler configured for AgentTriggerHandler")
|
||||
return []
|
||||
|
||||
logger.info(
|
||||
f"Agent trigger executing: session={self.session_id}, "
|
||||
f"content='{self.message_content[:50]}...'"
|
||||
)
|
||||
|
||||
try:
|
||||
# Call Gateway to handle message
|
||||
# In future, Gateway/agent tools will use coordinator stores
|
||||
await self.gateway_handler(
|
||||
self.session_id,
|
||||
self.message_content,
|
||||
self.attachments,
|
||||
)
|
||||
|
||||
# For now, agent doesn't directly modify stores
|
||||
# Future: agent tools will return commit intents
|
||||
return []
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Agent execution error: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
|
||||
class LambdaHandler(Trigger):
|
||||
"""
|
||||
Generic trigger that executes an arbitrary async callable.
|
||||
|
||||
Useful for custom triggers, one-off tasks, or testing.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
func: Callable[[], Awaitable[list[CommitIntent]]],
|
||||
priority: Priority = Priority.SYSTEM,
|
||||
):
|
||||
"""
|
||||
Initialize lambda handler.
|
||||
|
||||
Args:
|
||||
name: Descriptive name for this trigger
|
||||
func: Async callable that returns commit intents
|
||||
priority: Execution priority
|
||||
"""
|
||||
super().__init__(name, priority)
|
||||
self.func = func
|
||||
|
||||
async def execute(self) -> list[CommitIntent]:
|
||||
"""Execute the callable"""
|
||||
logger.info(f"Lambda trigger executing: {self.name}")
|
||||
return await self.func()
|
||||
|
||||
|
||||
class DataUpdateTrigger(Trigger):
|
||||
"""
|
||||
Trigger for DataSource bar updates.
|
||||
|
||||
Fired when new market data arrives. Can update indicators,
|
||||
trigger strategy logic, or notify the agent of market events.
|
||||
|
||||
Priority tuple: (DATA_SOURCE, event_time, queue_seq)
|
||||
Ensures older bars process before newer ones.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
source_name: str,
|
||||
symbol: str,
|
||||
resolution: str,
|
||||
bar_data: dict,
|
||||
coordinator: Optional[CommitCoordinator] = None,
|
||||
):
|
||||
"""
|
||||
Initialize data update trigger.
|
||||
|
||||
Args:
|
||||
source_name: Name of data source (e.g., "binance")
|
||||
symbol: Trading pair symbol
|
||||
resolution: Time resolution
|
||||
bar_data: Bar data dict (time, open, high, low, close, volume)
|
||||
coordinator: CommitCoordinator for accessing stores
|
||||
"""
|
||||
event_time = bar_data.get('time', int(time.time()))
|
||||
|
||||
# Priority tuple: sort by DATA_SOURCE priority, then event time
|
||||
super().__init__(
|
||||
name=f"data_{source_name}_{symbol}_{resolution}",
|
||||
priority=Priority.DATA_SOURCE,
|
||||
priority_tuple=(Priority.DATA_SOURCE.value, event_time)
|
||||
)
|
||||
self.source_name = source_name
|
||||
self.symbol = symbol
|
||||
self.resolution = resolution
|
||||
self.bar_data = bar_data
|
||||
self.coordinator = coordinator
|
||||
|
||||
async def execute(self) -> list[CommitIntent]:
|
||||
"""
|
||||
Process bar update.
|
||||
|
||||
Future implementations will:
|
||||
- Update indicator values
|
||||
- Check strategy conditions
|
||||
- Trigger alerts/notifications
|
||||
|
||||
Returns:
|
||||
Commit intents for any store updates
|
||||
"""
|
||||
logger.info(
|
||||
f"Data update trigger: {self.source_name}:{self.symbol}@{self.resolution}, "
|
||||
f"time={self.bar_data.get('time')}"
|
||||
)
|
||||
|
||||
# TODO: Update indicators
|
||||
# TODO: Check strategy conditions
|
||||
# TODO: Notify agent of significant events
|
||||
|
||||
# For now, just log
|
||||
return []
|
||||
|
||||
|
||||
class IndicatorUpdateTrigger(Trigger):
|
||||
"""
|
||||
Trigger for updating indicator values.
|
||||
|
||||
Can be fired by cron (periodic recalculation) or by data updates.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
indicator_id: str,
|
||||
force_full_recalc: bool = False,
|
||||
coordinator: Optional[CommitCoordinator] = None,
|
||||
priority: Priority = Priority.SYSTEM,
|
||||
):
|
||||
"""
|
||||
Initialize indicator update trigger.
|
||||
|
||||
Args:
|
||||
indicator_id: ID of indicator to update
|
||||
force_full_recalc: If True, recalculate entire history
|
||||
coordinator: CommitCoordinator for accessing stores
|
||||
priority: Execution priority
|
||||
"""
|
||||
super().__init__(f"indicator_{indicator_id}", priority)
|
||||
self.indicator_id = indicator_id
|
||||
self.force_full_recalc = force_full_recalc
|
||||
self.coordinator = coordinator
|
||||
|
||||
async def execute(self) -> list[CommitIntent]:
|
||||
"""
|
||||
Update indicator value.
|
||||
|
||||
Reads from IndicatorStore, recalculates, prepares commit.
|
||||
|
||||
Returns:
|
||||
Commit intents for updated indicator data
|
||||
"""
|
||||
if not self.coordinator:
|
||||
logger.error("No coordinator configured")
|
||||
return []
|
||||
|
||||
# Get indicator store
|
||||
indicator_store = self.coordinator.get_store("IndicatorStore")
|
||||
if not indicator_store:
|
||||
logger.error("IndicatorStore not registered")
|
||||
return []
|
||||
|
||||
# Read snapshot
|
||||
snapshot_seq, indicator_data = indicator_store.read_snapshot()
|
||||
|
||||
logger.info(
|
||||
f"Indicator update trigger: {self.indicator_id}, "
|
||||
f"snapshot_seq={snapshot_seq}, force_full={self.force_full_recalc}"
|
||||
)
|
||||
|
||||
# TODO: Implement indicator recalculation logic
|
||||
# For now, just return empty (no changes)
|
||||
|
||||
return []
|
||||
|
||||
|
||||
class CronTrigger(Trigger):
|
||||
"""
|
||||
Trigger fired by APScheduler on a schedule.
|
||||
|
||||
Wraps another trigger or callable to execute periodically.
|
||||
|
||||
Priority tuple: (TIMER, scheduled_time, queue_seq)
|
||||
Ensures jobs scheduled for earlier times run first.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
inner_trigger: Trigger,
|
||||
scheduled_time: Optional[int] = None,
|
||||
):
|
||||
"""
|
||||
Initialize cron trigger.
|
||||
|
||||
Args:
|
||||
name: Descriptive name (e.g., "hourly_sync")
|
||||
inner_trigger: Trigger to execute on schedule
|
||||
scheduled_time: When this was scheduled to run (defaults to now)
|
||||
"""
|
||||
if scheduled_time is None:
|
||||
scheduled_time = int(time.time())
|
||||
|
||||
# Priority tuple: sort by TIMER priority, then scheduled time
|
||||
super().__init__(
|
||||
name=f"cron_{name}",
|
||||
priority=Priority.TIMER,
|
||||
priority_tuple=(Priority.TIMER.value, scheduled_time)
|
||||
)
|
||||
self.inner_trigger = inner_trigger
|
||||
self.scheduled_time = scheduled_time
|
||||
|
||||
async def execute(self) -> list[CommitIntent]:
|
||||
"""Execute the wrapped trigger"""
|
||||
logger.info(f"Cron trigger firing: {self.name}")
|
||||
return await self.inner_trigger.execute()
|
||||
224
backend/src/trigger/queue.py
Normal file
224
backend/src/trigger/queue.py
Normal file
@@ -0,0 +1,224 @@
|
||||
"""
|
||||
Trigger queue - priority queue with sequence number assignment.
|
||||
|
||||
All operations flow through this queue:
|
||||
- WebSocket messages from users
|
||||
- Cron scheduled tasks
|
||||
- DataSource bar updates
|
||||
- Manual triggers
|
||||
|
||||
Queue assigns seq numbers on dequeue, executes triggers, and submits to coordinator.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from .context import ExecutionContext, clear_execution_context, set_execution_context
|
||||
from .coordinator import CommitCoordinator
|
||||
from .types import Priority, PriorityTuple, Trigger
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TriggerQueue:
|
||||
"""
|
||||
Priority queue for trigger execution.
|
||||
|
||||
Key responsibilities:
|
||||
- Maintain priority queue (high priority dequeued first)
|
||||
- Assign sequence numbers on dequeue (determines commit order)
|
||||
- Execute triggers with context set
|
||||
- Submit results to CommitCoordinator
|
||||
- Handle execution errors gracefully
|
||||
"""
|
||||
|
||||
def __init__(self, coordinator: CommitCoordinator):
|
||||
"""
|
||||
Initialize trigger queue.
|
||||
|
||||
Args:
|
||||
coordinator: CommitCoordinator for handling commits
|
||||
"""
|
||||
self._coordinator = coordinator
|
||||
self._queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
|
||||
self._seq_counter = 0
|
||||
self._seq_lock = asyncio.Lock()
|
||||
self._processor_task: Optional[asyncio.Task] = None
|
||||
self._running = False
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the queue processor"""
|
||||
if self._running:
|
||||
logger.warning("TriggerQueue already running")
|
||||
return
|
||||
|
||||
self._running = True
|
||||
self._processor_task = asyncio.create_task(self._process_loop())
|
||||
logger.info("TriggerQueue started")
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Stop the queue processor gracefully"""
|
||||
if not self._running:
|
||||
return
|
||||
|
||||
self._running = False
|
||||
|
||||
if self._processor_task:
|
||||
self._processor_task.cancel()
|
||||
try:
|
||||
await self._processor_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
logger.info("TriggerQueue stopped")
|
||||
|
||||
async def enqueue(
|
||||
self,
|
||||
trigger: Trigger,
|
||||
priority_override: Optional[Priority | PriorityTuple] = None
|
||||
) -> int:
|
||||
"""
|
||||
Add a trigger to the queue.
|
||||
|
||||
Args:
|
||||
trigger: Trigger to execute
|
||||
priority_override: Override priority (simple Priority or tuple)
|
||||
If None, uses trigger's priority/priority_tuple
|
||||
If Priority enum, creates single-element tuple
|
||||
If tuple, uses as-is
|
||||
|
||||
Returns:
|
||||
Queue sequence number (appended to priority tuple)
|
||||
|
||||
Examples:
|
||||
# Simple priority
|
||||
await queue.enqueue(trigger, Priority.USER_AGENT)
|
||||
# Results in: (Priority.USER_AGENT, queue_seq)
|
||||
|
||||
# Tuple priority with event time
|
||||
await queue.enqueue(
|
||||
trigger,
|
||||
(Priority.DATA_SOURCE, bar_data['time'])
|
||||
)
|
||||
# Results in: (Priority.DATA_SOURCE, bar_time, queue_seq)
|
||||
|
||||
# Let trigger decide
|
||||
await queue.enqueue(trigger)
|
||||
"""
|
||||
# Get monotonic seq for queue ordering (appended to tuple)
|
||||
async with self._seq_lock:
|
||||
queue_seq = self._seq_counter
|
||||
self._seq_counter += 1
|
||||
|
||||
# Determine priority tuple
|
||||
if priority_override is not None:
|
||||
if isinstance(priority_override, Priority):
|
||||
# Convert simple priority to tuple
|
||||
priority_tuple = (priority_override.value, queue_seq)
|
||||
else:
|
||||
# Use provided tuple, append queue_seq
|
||||
priority_tuple = priority_override + (queue_seq,)
|
||||
else:
|
||||
# Let trigger determine its own priority tuple
|
||||
priority_tuple = trigger.get_priority_tuple(queue_seq)
|
||||
|
||||
# Priority queue: (priority_tuple, trigger)
|
||||
# Python's PriorityQueue compares tuples element-by-element
|
||||
await self._queue.put((priority_tuple, trigger))
|
||||
|
||||
logger.debug(
|
||||
f"Enqueued: {trigger.name} with priority_tuple={priority_tuple}"
|
||||
)
|
||||
|
||||
return queue_seq
|
||||
|
||||
async def _process_loop(self) -> None:
|
||||
"""
|
||||
Main processing loop.
|
||||
|
||||
Dequeues triggers, assigns execution seq, executes, and submits to coordinator.
|
||||
"""
|
||||
execution_seq = 0 # Separate counter for execution sequence
|
||||
|
||||
while self._running:
|
||||
try:
|
||||
# Wait for next trigger (with timeout to check _running flag)
|
||||
try:
|
||||
priority_tuple, trigger = await asyncio.wait_for(
|
||||
self._queue.get(), timeout=1.0
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
|
||||
# Assign execution sequence number
|
||||
execution_seq += 1
|
||||
|
||||
logger.info(
|
||||
f"Dequeued: seq={execution_seq}, trigger={trigger.name}, "
|
||||
f"priority_tuple={priority_tuple}"
|
||||
)
|
||||
|
||||
# Execute in background (don't block queue)
|
||||
asyncio.create_task(
|
||||
self._execute_trigger(execution_seq, trigger)
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in process loop: {e}", exc_info=True)
|
||||
|
||||
async def _execute_trigger(self, seq: int, trigger: Trigger) -> None:
|
||||
"""
|
||||
Execute a trigger with proper context and error handling.
|
||||
|
||||
Args:
|
||||
seq: Execution sequence number
|
||||
trigger: Trigger to execute
|
||||
"""
|
||||
# Set up execution context
|
||||
ctx = ExecutionContext(
|
||||
seq=seq,
|
||||
trigger_name=trigger.name,
|
||||
)
|
||||
set_execution_context(ctx)
|
||||
|
||||
# Record execution start with coordinator
|
||||
await self._coordinator.start_execution(seq, trigger)
|
||||
|
||||
try:
|
||||
logger.info(f"Executing: seq={seq}, trigger={trigger.name}")
|
||||
|
||||
# Execute trigger (can be long-running)
|
||||
commit_intents = await trigger.execute()
|
||||
|
||||
logger.info(
|
||||
f"Execution complete: seq={seq}, {len(commit_intents)} commit intents"
|
||||
)
|
||||
|
||||
# Submit for sequential commit
|
||||
await self._coordinator.submit_for_commit(seq, commit_intents)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Execution failed: seq={seq}, trigger={trigger.name}, error={e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
# Notify coordinator of failure
|
||||
await self._coordinator.execution_failed(seq, e)
|
||||
|
||||
finally:
|
||||
clear_execution_context()
|
||||
|
||||
def get_queue_size(self) -> int:
|
||||
"""Get current queue size (approximate)"""
|
||||
return self._queue.qsize()
|
||||
|
||||
def is_running(self) -> bool:
|
||||
"""Check if queue processor is running"""
|
||||
return self._running
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f"TriggerQueue(running={self._running}, queue_size={self.get_queue_size()})"
|
||||
)
|
||||
187
backend/src/trigger/scheduler.py
Normal file
187
backend/src/trigger/scheduler.py
Normal file
@@ -0,0 +1,187 @@
|
||||
"""
|
||||
APScheduler integration for cron-style triggers.
|
||||
|
||||
Provides scheduling of periodic triggers (e.g., sync exchange state hourly,
|
||||
recompute indicators every 5 minutes, daily portfolio reports).
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from apscheduler.triggers.cron import CronTrigger as APSCronTrigger
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
|
||||
from .queue import TriggerQueue
|
||||
from .types import Priority, Trigger
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TriggerScheduler:
|
||||
"""
|
||||
Scheduler for periodic trigger execution.
|
||||
|
||||
Wraps APScheduler to enqueue triggers at scheduled times.
|
||||
"""
|
||||
|
||||
def __init__(self, trigger_queue: TriggerQueue):
|
||||
"""
|
||||
Initialize scheduler.
|
||||
|
||||
Args:
|
||||
trigger_queue: TriggerQueue to enqueue triggers into
|
||||
"""
|
||||
self.trigger_queue = trigger_queue
|
||||
self.scheduler = AsyncIOScheduler()
|
||||
self._job_counter = 0
|
||||
|
||||
def start(self) -> None:
|
||||
"""Start the scheduler"""
|
||||
self.scheduler.start()
|
||||
logger.info("TriggerScheduler started")
|
||||
|
||||
def shutdown(self, wait: bool = True) -> None:
|
||||
"""
|
||||
Shut down the scheduler.
|
||||
|
||||
Args:
|
||||
wait: If True, wait for running jobs to complete
|
||||
"""
|
||||
self.scheduler.shutdown(wait=wait)
|
||||
logger.info("TriggerScheduler shut down")
|
||||
|
||||
def schedule_interval(
|
||||
self,
|
||||
trigger: Trigger,
|
||||
seconds: Optional[int] = None,
|
||||
minutes: Optional[int] = None,
|
||||
hours: Optional[int] = None,
|
||||
priority: Optional[Priority] = None,
|
||||
) -> str:
|
||||
"""
|
||||
Schedule a trigger to run at regular intervals.
|
||||
|
||||
Args:
|
||||
trigger: Trigger to execute
|
||||
seconds: Interval in seconds
|
||||
minutes: Interval in minutes
|
||||
hours: Interval in hours
|
||||
priority: Priority override for execution
|
||||
|
||||
Returns:
|
||||
Job ID (can be used to remove job later)
|
||||
|
||||
Example:
|
||||
# Run every 5 minutes
|
||||
scheduler.schedule_interval(
|
||||
IndicatorUpdateTrigger("rsi_14"),
|
||||
minutes=5
|
||||
)
|
||||
"""
|
||||
job_id = f"interval_{self._job_counter}"
|
||||
self._job_counter += 1
|
||||
|
||||
async def job_func():
|
||||
await self.trigger_queue.enqueue(trigger, priority)
|
||||
|
||||
self.scheduler.add_job(
|
||||
job_func,
|
||||
trigger=IntervalTrigger(seconds=seconds, minutes=minutes, hours=hours),
|
||||
id=job_id,
|
||||
name=f"Interval: {trigger.name}",
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Scheduled interval job: {job_id}, trigger={trigger.name}, "
|
||||
f"interval=(s={seconds}, m={minutes}, h={hours})"
|
||||
)
|
||||
|
||||
return job_id
|
||||
|
||||
def schedule_cron(
|
||||
self,
|
||||
trigger: Trigger,
|
||||
minute: Optional[str] = None,
|
||||
hour: Optional[str] = None,
|
||||
day: Optional[str] = None,
|
||||
month: Optional[str] = None,
|
||||
day_of_week: Optional[str] = None,
|
||||
priority: Optional[Priority] = None,
|
||||
) -> str:
|
||||
"""
|
||||
Schedule a trigger to run on a cron schedule.
|
||||
|
||||
Args:
|
||||
trigger: Trigger to execute
|
||||
minute: Minute expression (0-59, *, */5, etc.)
|
||||
hour: Hour expression (0-23, *, etc.)
|
||||
day: Day of month expression (1-31, *, etc.)
|
||||
month: Month expression (1-12, *, etc.)
|
||||
day_of_week: Day of week expression (0-6, mon-sun, *, etc.)
|
||||
priority: Priority override for execution
|
||||
|
||||
Returns:
|
||||
Job ID (can be used to remove job later)
|
||||
|
||||
Example:
|
||||
# Run at 9:00 AM every weekday
|
||||
scheduler.schedule_cron(
|
||||
SyncExchangeStateTrigger(),
|
||||
hour="9",
|
||||
minute="0",
|
||||
day_of_week="mon-fri"
|
||||
)
|
||||
"""
|
||||
job_id = f"cron_{self._job_counter}"
|
||||
self._job_counter += 1
|
||||
|
||||
async def job_func():
|
||||
await self.trigger_queue.enqueue(trigger, priority)
|
||||
|
||||
self.scheduler.add_job(
|
||||
job_func,
|
||||
trigger=APSCronTrigger(
|
||||
minute=minute,
|
||||
hour=hour,
|
||||
day=day,
|
||||
month=month,
|
||||
day_of_week=day_of_week,
|
||||
),
|
||||
id=job_id,
|
||||
name=f"Cron: {trigger.name}",
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Scheduled cron job: {job_id}, trigger={trigger.name}, "
|
||||
f"schedule=(m={minute}, h={hour}, d={day}, dow={day_of_week})"
|
||||
)
|
||||
|
||||
return job_id
|
||||
|
||||
def remove_job(self, job_id: str) -> bool:
|
||||
"""
|
||||
Remove a scheduled job.
|
||||
|
||||
Args:
|
||||
job_id: Job ID returned from schedule_* methods
|
||||
|
||||
Returns:
|
||||
True if job was removed, False if not found
|
||||
"""
|
||||
try:
|
||||
self.scheduler.remove_job(job_id)
|
||||
logger.info(f"Removed scheduled job: {job_id}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not remove job {job_id}: {e}")
|
||||
return False
|
||||
|
||||
def get_jobs(self) -> list:
|
||||
"""Get list of all scheduled jobs"""
|
||||
return self.scheduler.get_jobs()
|
||||
|
||||
def __repr__(self) -> str:
|
||||
job_count = len(self.scheduler.get_jobs())
|
||||
running = self.scheduler.running
|
||||
return f"TriggerScheduler(running={running}, jobs={job_count})"
|
||||
301
backend/src/trigger/store.py
Normal file
301
backend/src/trigger/store.py
Normal file
@@ -0,0 +1,301 @@
|
||||
"""
|
||||
Versioned store with pluggable backends.
|
||||
|
||||
Provides optimistic concurrency control via sequence numbers with support
|
||||
for different storage backends (Pydantic models, files, databases, etc.).
|
||||
"""
|
||||
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
from copy import deepcopy
|
||||
from typing import Any, Generic, TypeVar
|
||||
|
||||
from .context import get_execution_context
|
||||
from .types import CommitIntent
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class StoreBackend(ABC, Generic[T]):
|
||||
"""
|
||||
Abstract backend for versioned stores.
|
||||
|
||||
Allows different storage mechanisms (Pydantic models, files, databases)
|
||||
to be used with the same versioned store infrastructure.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def read(self) -> T:
|
||||
"""
|
||||
Read the current data.
|
||||
|
||||
Returns:
|
||||
Current data in backend-specific format
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def write(self, data: T) -> None:
|
||||
"""
|
||||
Write new data (replaces existing).
|
||||
|
||||
Args:
|
||||
data: New data to write
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def snapshot(self) -> T:
|
||||
"""
|
||||
Create an immutable snapshot of current data.
|
||||
|
||||
Must return a deep copy or immutable version to prevent
|
||||
modifications from affecting the committed state.
|
||||
|
||||
Returns:
|
||||
Immutable snapshot of data
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def validate(self, data: T) -> bool:
|
||||
"""
|
||||
Validate that data is in correct format for this backend.
|
||||
|
||||
Args:
|
||||
data: Data to validate
|
||||
|
||||
Returns:
|
||||
True if valid
|
||||
|
||||
Raises:
|
||||
ValueError: If invalid with explanation
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class PydanticStoreBackend(StoreBackend[T]):
|
||||
"""
|
||||
Backend for Pydantic BaseModel stores.
|
||||
|
||||
Supports the existing OrderStore, ChartStore, etc. pattern.
|
||||
"""
|
||||
|
||||
def __init__(self, model_instance: T):
|
||||
"""
|
||||
Initialize with a Pydantic model instance.
|
||||
|
||||
Args:
|
||||
model_instance: Instance of a Pydantic BaseModel
|
||||
"""
|
||||
self._model = model_instance
|
||||
|
||||
def read(self) -> T:
|
||||
return self._model
|
||||
|
||||
def write(self, data: T) -> None:
|
||||
# Replace the internal model
|
||||
self._model = data
|
||||
|
||||
def snapshot(self) -> T:
|
||||
# Use Pydantic's model_copy for deep copy
|
||||
if hasattr(self._model, "model_copy"):
|
||||
return self._model.model_copy(deep=True)
|
||||
# Fallback for older Pydantic or non-model types
|
||||
return deepcopy(self._model)
|
||||
|
||||
def validate(self, data: T) -> bool:
|
||||
# Pydantic models validate themselves on construction
|
||||
# If we got here with a model instance, it's valid
|
||||
return True
|
||||
|
||||
|
||||
class FileStoreBackend(StoreBackend[str]):
|
||||
"""
|
||||
Backend for file-based storage.
|
||||
|
||||
Future implementation for versioning files (e.g., Python scripts, configs).
|
||||
"""
|
||||
|
||||
def __init__(self, file_path: str):
|
||||
self.file_path = file_path
|
||||
raise NotImplementedError("FileStoreBackend not yet implemented")
|
||||
|
||||
def read(self) -> str:
|
||||
raise NotImplementedError()
|
||||
|
||||
def write(self, data: str) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
def snapshot(self) -> str:
|
||||
raise NotImplementedError()
|
||||
|
||||
def validate(self, data: str) -> bool:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class DatabaseStoreBackend(StoreBackend[dict]):
|
||||
"""
|
||||
Backend for database table storage.
|
||||
|
||||
Future implementation for versioning database interactions.
|
||||
"""
|
||||
|
||||
def __init__(self, table_name: str, connection):
|
||||
self.table_name = table_name
|
||||
self.connection = connection
|
||||
raise NotImplementedError("DatabaseStoreBackend not yet implemented")
|
||||
|
||||
def read(self) -> dict:
|
||||
raise NotImplementedError()
|
||||
|
||||
def write(self, data: dict) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
def snapshot(self) -> dict:
|
||||
raise NotImplementedError()
|
||||
|
||||
def validate(self, data: dict) -> bool:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class VersionedStore(Generic[T]):
|
||||
"""
|
||||
Store with optimistic concurrency control via sequence numbers.
|
||||
|
||||
Wraps any StoreBackend and provides:
|
||||
- Lock-free snapshot reads
|
||||
- Conflict detection on commit
|
||||
- Version tracking for debugging
|
||||
"""
|
||||
|
||||
def __init__(self, name: str, backend: StoreBackend[T]):
|
||||
"""
|
||||
Initialize versioned store.
|
||||
|
||||
Args:
|
||||
name: Unique name for this store (e.g., "OrderStore")
|
||||
backend: Backend implementation for storage
|
||||
"""
|
||||
self.name = name
|
||||
self._backend = backend
|
||||
self._committed_seq = 0 # Highest committed seq
|
||||
self._version = 0 # Increments on each commit (for debugging)
|
||||
|
||||
@property
|
||||
def committed_seq(self) -> int:
|
||||
"""Get the current committed sequence number"""
|
||||
return self._committed_seq
|
||||
|
||||
@property
|
||||
def version(self) -> int:
|
||||
"""Get the current version (increments on each commit)"""
|
||||
return self._version
|
||||
|
||||
def read_snapshot(self) -> tuple[int, T]:
|
||||
"""
|
||||
Read an immutable snapshot of the store.
|
||||
|
||||
This is lock-free and can be called concurrently. The snapshot
|
||||
captures the current committed seq and a deep copy of the data.
|
||||
|
||||
Automatically records the snapshot seq in the execution context
|
||||
for conflict detection during commit.
|
||||
|
||||
Returns:
|
||||
Tuple of (seq, snapshot_data)
|
||||
"""
|
||||
snapshot_seq = self._committed_seq
|
||||
snapshot_data = self._backend.snapshot()
|
||||
|
||||
# Record in execution context for conflict detection
|
||||
ctx = get_execution_context()
|
||||
if ctx:
|
||||
ctx.record_snapshot(self.name, snapshot_seq)
|
||||
|
||||
logger.debug(
|
||||
f"Store '{self.name}': read_snapshot() -> seq={snapshot_seq}, version={self._version}"
|
||||
)
|
||||
|
||||
return (snapshot_seq, snapshot_data)
|
||||
|
||||
def read_current(self) -> T:
|
||||
"""
|
||||
Read the current data without snapshot tracking.
|
||||
|
||||
Use this for read-only operations that don't need conflict detection.
|
||||
|
||||
Returns:
|
||||
Current data (not a snapshot, modifications visible)
|
||||
"""
|
||||
return self._backend.read()
|
||||
|
||||
def prepare_commit(self, expected_seq: int, new_data: T) -> CommitIntent:
|
||||
"""
|
||||
Create a commit intent for later sequential commit.
|
||||
|
||||
Does NOT modify the store - that happens during the commit phase.
|
||||
|
||||
Args:
|
||||
expected_seq: The seq of the snapshot that was read
|
||||
new_data: The new data to commit
|
||||
|
||||
Returns:
|
||||
CommitIntent to be submitted to CommitCoordinator
|
||||
"""
|
||||
# Validate data before creating intent
|
||||
self._backend.validate(new_data)
|
||||
|
||||
intent = CommitIntent(
|
||||
store_name=self.name,
|
||||
expected_seq=expected_seq,
|
||||
new_data=new_data,
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"Store '{self.name}': prepare_commit(expected_seq={expected_seq}, current_seq={self._committed_seq})"
|
||||
)
|
||||
|
||||
return intent
|
||||
|
||||
def commit(self, new_data: T, commit_seq: int) -> None:
|
||||
"""
|
||||
Commit new data at a specific seq.
|
||||
|
||||
Called by CommitCoordinator during sequential commit phase.
|
||||
NOT for direct use by triggers.
|
||||
|
||||
Args:
|
||||
new_data: Data to commit
|
||||
commit_seq: Seq number of this commit
|
||||
"""
|
||||
self._backend.write(new_data)
|
||||
self._committed_seq = commit_seq
|
||||
self._version += 1
|
||||
|
||||
logger.info(
|
||||
f"Store '{self.name}': committed seq={commit_seq}, version={self._version}"
|
||||
)
|
||||
|
||||
def check_conflict(self, expected_seq: int) -> bool:
|
||||
"""
|
||||
Check if committing at expected_seq would conflict.
|
||||
|
||||
Args:
|
||||
expected_seq: The seq that was expected during execution
|
||||
|
||||
Returns:
|
||||
True if conflict (committed_seq has advanced beyond expected_seq)
|
||||
"""
|
||||
has_conflict = self._committed_seq != expected_seq
|
||||
if has_conflict:
|
||||
logger.warning(
|
||||
f"Store '{self.name}': conflict detected - "
|
||||
f"expected_seq={expected_seq}, committed_seq={self._committed_seq}"
|
||||
)
|
||||
return has_conflict
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"VersionedStore(name='{self.name}', committed_seq={self._committed_seq}, version={self._version})"
|
||||
175
backend/src/trigger/types.py
Normal file
175
backend/src/trigger/types.py
Normal file
@@ -0,0 +1,175 @@
|
||||
"""
|
||||
Core types for the trigger system.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from enum import IntEnum
|
||||
from typing import Any, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Priority(IntEnum):
|
||||
"""
|
||||
Primary execution priority for triggers.
|
||||
|
||||
Lower numeric value = higher priority (dequeued first).
|
||||
|
||||
Priority hierarchy (highest to lowest):
|
||||
- DATA_SOURCE: Market data, real-time feeds (most time-sensitive)
|
||||
- TIMER: Scheduled tasks, cron jobs
|
||||
- USER_AGENT: User-agent interactions (WebSocket chat)
|
||||
- USER_DATA_REQUEST: User data requests (chart loads, symbol search)
|
||||
- SYSTEM: Background tasks, cleanup
|
||||
- LOW: Retries after conflicts, non-critical tasks
|
||||
"""
|
||||
|
||||
DATA_SOURCE = 0 # Market data updates, real-time feeds
|
||||
TIMER = 1 # Scheduled tasks, cron jobs
|
||||
USER_AGENT = 2 # User-agent interactions (WebSocket chat)
|
||||
USER_DATA_REQUEST = 3 # User data requests (chart loads, etc.)
|
||||
SYSTEM = 4 # Background tasks, cleanup, etc.
|
||||
LOW = 5 # Retries after conflicts, non-critical tasks
|
||||
|
||||
|
||||
# Type alias for priority tuples
|
||||
# Examples:
|
||||
# (Priority.DATA_SOURCE,) - Simple priority
|
||||
# (Priority.DATA_SOURCE, event_time) - Priority + event time
|
||||
# (Priority.DATA_SOURCE, event_time, queue_seq) - Full ordering
|
||||
#
|
||||
# Python compares tuples element-by-element, left-to-right.
|
||||
# Shorter tuple wins if all shared elements are equal.
|
||||
PriorityTuple = tuple[int, ...]
|
||||
|
||||
|
||||
class ExecutionState(IntEnum):
|
||||
"""State of an execution in the system"""
|
||||
|
||||
QUEUED = 0 # In queue, waiting to be dequeued
|
||||
EXECUTING = 1 # Currently executing
|
||||
WAITING_COMMIT = 2 # Finished executing, waiting for sequential commit
|
||||
COMMITTED = 3 # Successfully committed
|
||||
EVICTED = 4 # Evicted due to conflict, will retry
|
||||
FAILED = 5 # Failed with error
|
||||
|
||||
|
||||
@dataclass
|
||||
class CommitIntent:
|
||||
"""
|
||||
Intent to commit changes to a store.
|
||||
|
||||
Created during execution, validated and applied during sequential commit phase.
|
||||
"""
|
||||
|
||||
store_name: str
|
||||
"""Name of the store to commit to"""
|
||||
|
||||
expected_seq: int
|
||||
"""The seq number of the snapshot that was read (for conflict detection)"""
|
||||
|
||||
new_data: Any
|
||||
"""The new data to commit (format depends on store backend)"""
|
||||
|
||||
def __repr__(self) -> str:
|
||||
data_preview = str(self.new_data)[:50]
|
||||
return f"CommitIntent(store={self.store_name}, expected_seq={self.expected_seq}, data={data_preview}...)"
|
||||
|
||||
|
||||
class Trigger(ABC):
|
||||
"""
|
||||
Abstract base class for all triggers.
|
||||
|
||||
A trigger represents a unit of work that:
|
||||
1. Gets assigned a seq number when dequeued
|
||||
2. Executes (potentially long-running, async)
|
||||
3. Returns CommitIntents for any state changes
|
||||
4. Waits for sequential commit
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
priority: Priority = Priority.SYSTEM,
|
||||
priority_tuple: Optional[PriorityTuple] = None
|
||||
):
|
||||
"""
|
||||
Initialize trigger.
|
||||
|
||||
Args:
|
||||
name: Descriptive name for logging
|
||||
priority: Simple priority (used if priority_tuple not provided)
|
||||
priority_tuple: Optional tuple for compound sorting
|
||||
Examples:
|
||||
(Priority.DATA_SOURCE, event_time)
|
||||
(Priority.USER_AGENT, message_timestamp)
|
||||
(Priority.TIMER, scheduled_time)
|
||||
"""
|
||||
self.name = name
|
||||
self.priority = priority
|
||||
self._priority_tuple = priority_tuple
|
||||
|
||||
def get_priority_tuple(self, queue_seq: int) -> PriorityTuple:
|
||||
"""
|
||||
Get the priority tuple for queue ordering.
|
||||
|
||||
If a priority tuple was provided at construction, append queue_seq.
|
||||
Otherwise, create tuple from simple priority.
|
||||
|
||||
Args:
|
||||
queue_seq: Queue insertion order (final sort key)
|
||||
|
||||
Returns:
|
||||
Priority tuple for queue ordering
|
||||
|
||||
Examples:
|
||||
(Priority.DATA_SOURCE,) + (queue_seq,) = (0, queue_seq)
|
||||
(Priority.DATA_SOURCE, 1000) + (queue_seq,) = (0, 1000, queue_seq)
|
||||
"""
|
||||
if self._priority_tuple is not None:
|
||||
return self._priority_tuple + (queue_seq,)
|
||||
else:
|
||||
return (self.priority.value, queue_seq)
|
||||
|
||||
@abstractmethod
|
||||
async def execute(self) -> list[CommitIntent]:
|
||||
"""
|
||||
Execute the trigger logic.
|
||||
|
||||
Can be long-running and async. Should read from stores via
|
||||
VersionedStore.read_snapshot() and return CommitIntents for any changes.
|
||||
|
||||
Returns:
|
||||
List of CommitIntents (empty if no state changes)
|
||||
|
||||
Raises:
|
||||
Exception: On execution failure (will be logged, no commit)
|
||||
"""
|
||||
pass
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{self.__class__.__name__}(name='{self.name}', priority={self.priority.name})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ExecutionRecord:
|
||||
"""
|
||||
Record of an execution for tracking and debugging.
|
||||
|
||||
Maintained by the CommitCoordinator to track in-flight executions.
|
||||
"""
|
||||
|
||||
seq: int
|
||||
trigger: Trigger
|
||||
state: ExecutionState
|
||||
commit_intents: Optional[list[CommitIntent]] = None
|
||||
error: Optional[str] = None
|
||||
retry_count: int = 0
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f"ExecutionRecord(seq={self.seq}, trigger={self.trigger.name}, "
|
||||
f"state={self.state.name}, retry={self.retry_count})"
|
||||
)
|
||||
@@ -18,7 +18,7 @@ RUN apt-get update \
|
||||
&& make install \
|
||||
&& cd .. \
|
||||
&& rm -rf ta-lib ta-lib-0.4.0-src.tar.gz \
|
||||
&& apt-get purge -y --auto-remove gcc g++ make wget ca-certificates \
|
||||
&& apt-get purge -y --auto-remove gcc g++ make wget \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Install Python build dependencies early for better layer caching
|
||||
|
||||
@@ -35,6 +35,11 @@ spec:
|
||||
env:
|
||||
- name: CONFIG
|
||||
value: "dev"
|
||||
- name: ANTHROPIC_API_KEY
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: ai-secrets
|
||||
key: anthropic-api-key
|
||||
volumeMounts:
|
||||
- name: ai-backend-data
|
||||
mountPath: /app/data
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<meta charset="UTF-8">
|
||||
<link rel="icon" href="/favicon.ico">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>Vite App</title>
|
||||
<title>dexorder</title>
|
||||
<script type="text/javascript" src="/charting_library/charting_library.standalone.js"></script>
|
||||
</head>
|
||||
<body>
|
||||
|
||||
@@ -33,7 +33,7 @@ onMounted(() => {
|
||||
datafeed: datafeed,
|
||||
interval: chartStore.chart_state.interval as any,
|
||||
container: chartContainer.value!,
|
||||
library_path: 'charting_library/',
|
||||
library_path: '/charting_library/',
|
||||
locale: 'en',
|
||||
disabled_features: [
|
||||
'use_localstorage_for_settings',
|
||||
|
||||
Reference in New Issue
Block a user