""" Indicator registry for managing and discovering indicators. Provides AI agents with a queryable catalog of available indicators, their capabilities, and metadata. """ from typing import Dict, List, Optional, Type from .base import Indicator from .schema import IndicatorMetadata, InputSchema, OutputSchema class IndicatorRegistry: """ Central registry for indicator classes. Enables: - Registration of indicator implementations - Discovery by name, category, or tags - Schema validation - AI agent tool generation """ def __init__(self): self._indicators: Dict[str, Type[Indicator]] = {} def register(self, indicator_class: Type[Indicator]) -> None: """ Register an indicator class. Args: indicator_class: Indicator class to register Raises: ValueError: If an indicator with this name is already registered """ metadata = indicator_class.get_metadata() if metadata.name in self._indicators: raise ValueError( f"Indicator '{metadata.name}' is already registered" ) self._indicators[metadata.name] = indicator_class def unregister(self, name: str) -> None: """ Unregister an indicator class. Args: name: Indicator class name """ self._indicators.pop(name, None) def get(self, name: str) -> Optional[Type[Indicator]]: """ Get an indicator class by name. Args: name: Indicator class name Returns: Indicator class or None if not found """ return self._indicators.get(name) def list_indicators(self) -> List[str]: """ Get names of all registered indicators. Returns: List of indicator class names """ return list(self._indicators.keys()) def get_metadata(self, name: str) -> Optional[IndicatorMetadata]: """ Get metadata for a specific indicator. Args: name: Indicator class name Returns: IndicatorMetadata or None if not found """ indicator_class = self.get(name) if indicator_class: return indicator_class.get_metadata() return None def get_all_metadata(self) -> List[IndicatorMetadata]: """ Get metadata for all registered indicators. Useful for AI agent tool generation and discovery. Returns: List of IndicatorMetadata for all registered indicators """ return [cls.get_metadata() for cls in self._indicators.values()] def search_by_category(self, category: str) -> List[IndicatorMetadata]: """ Find indicators by category. Args: category: Category name (e.g., 'momentum', 'trend', 'volatility') Returns: List of matching indicator metadata """ results = [] for indicator_class in self._indicators.values(): metadata = indicator_class.get_metadata() if metadata.category.lower() == category.lower(): results.append(metadata) return results def search_by_tag(self, tag: str) -> List[IndicatorMetadata]: """ Find indicators by tag. Args: tag: Tag to search for (case-insensitive) Returns: List of matching indicator metadata """ tag_lower = tag.lower() results = [] for indicator_class in self._indicators.values(): metadata = indicator_class.get_metadata() if any(t.lower() == tag_lower for t in metadata.tags): results.append(metadata) return results def search_by_text(self, query: str) -> List[IndicatorMetadata]: """ Full-text search across indicator names, descriptions, and use cases. Args: query: Search query (case-insensitive) Returns: List of matching indicator metadata, ranked by relevance """ query_lower = query.lower() results = [] for indicator_class in self._indicators.values(): metadata = indicator_class.get_metadata() score = 0 # Check name (highest weight) if query_lower in metadata.name.lower(): score += 10 if query_lower in metadata.display_name.lower(): score += 8 # Check description if query_lower in metadata.description.lower(): score += 5 # Check use cases for use_case in metadata.use_cases: if query_lower in use_case.lower(): score += 3 # Check tags for tag in metadata.tags: if query_lower in tag.lower(): score += 2 if score > 0: results.append((score, metadata)) # Sort by score descending results.sort(key=lambda x: x[0], reverse=True) return [metadata for _, metadata in results] def find_compatible_indicators( self, available_columns: List[str], column_types: Dict[str, str] ) -> List[IndicatorMetadata]: """ Find indicators that can be computed from available columns. Args: available_columns: List of column names available column_types: Mapping of column name to type Returns: List of indicators whose input schema is satisfied """ from datasource.schema import ColumnInfo # Build ColumnInfo list from available data available_schema = [ ColumnInfo( name=name, type=column_types.get(name, "float"), description=f"Column {name}" ) for name in available_columns ] results = [] for indicator_class in self._indicators.values(): input_schema = indicator_class.get_input_schema() if input_schema.matches(available_schema): results.append(indicator_class.get_metadata()) return results def validate_indicator_chain( self, indicator_chain: List[tuple[str, Dict]] ) -> tuple[bool, Optional[str]]: """ Validate that a chain of indicators can be connected. Args: indicator_chain: List of (indicator_name, params) tuples in execution order Returns: Tuple of (is_valid, error_message) """ if not indicator_chain: return True, None # For now, just check that all indicators exist # More sophisticated DAG validation happens in the pipeline engine for indicator_name, params in indicator_chain: if indicator_name not in self._indicators: return False, f"Indicator '{indicator_name}' not found in registry" return True, None def get_input_schema(self, name: str) -> Optional[InputSchema]: """ Get input schema for a specific indicator. Args: name: Indicator class name Returns: InputSchema or None if not found """ indicator_class = self.get(name) if indicator_class: return indicator_class.get_input_schema() return None def get_output_schema(self, name: str, **params) -> Optional[OutputSchema]: """ Get output schema for a specific indicator with given parameters. Args: name: Indicator class name **params: Indicator parameters Returns: OutputSchema or None if not found """ indicator_class = self.get(name) if indicator_class: return indicator_class.get_output_schema(**params) return None def create_instance(self, name: str, instance_name: str, **params) -> Optional[Indicator]: """ Create an indicator instance with validation. Args: name: Indicator class name instance_name: Unique instance name (for output column prefixing) **params: Indicator configuration parameters Returns: Indicator instance or None if class not found Raises: ValueError: If parameters are invalid """ indicator_class = self.get(name) if not indicator_class: return None return indicator_class(instance_name=instance_name, **params) def generate_ai_tool_spec(self) -> Dict: """ Generate a JSON specification for AI agent tools. Creates a structured representation of all indicators that can be used to build agent tools for indicator selection and composition. Returns: Dict suitable for AI agent tool registration """ tools = [] for indicator_class in self._indicators.values(): metadata = indicator_class.get_metadata() # Build parameter spec parameters = { "type": "object", "properties": {}, "required": [] } for param in metadata.parameters: param_spec = { "type": param.type, "description": param.description } if param.default is not None: param_spec["default"] = param.default if param.min_value is not None: param_spec["minimum"] = param.min_value if param.max_value is not None: param_spec["maximum"] = param.max_value parameters["properties"][param.name] = param_spec if param.required: parameters["required"].append(param.name) tool = { "name": f"indicator_{metadata.name.lower()}", "description": f"{metadata.display_name}: {metadata.description}", "category": metadata.category, "use_cases": metadata.use_cases, "tags": metadata.tags, "parameters": parameters, "input_schema": indicator_class.get_input_schema().model_dump(), "output_schema": indicator_class.get_output_schema().model_dump() } tools.append(tool) return { "indicator_tools": tools, "total_count": len(tools) }