import type { BaseMessage } from '@langchain/core/messages'; import { HumanMessage, SystemMessage, ToolMessage } from '@langchain/core/messages'; import type { FastifyBaseLogger } from 'fastify'; import type { License } from '../types/user.js'; import { ChannelType } from '../types/user.js'; import type { ConversationStore } from './memory/conversation-store.js'; import type { BlobStore } from './memory/blob-store.js'; import type { InboundMessage, OutboundMessage } from '../types/messages.js'; import { MCPClientConnector } from './mcp-client.js'; import { LLMProviderFactory, type ProviderConfig } from '../llm/provider.js'; import { ModelRouter, RoutingStrategy } from '../llm/router.js'; import type { ModelMiddleware } from '../llm/middleware.js'; import type { WorkspaceManager } from '../workspace/workspace-manager.js'; import type { ChannelAdapter } from '../workspace/index.js'; import type { ResearchSubagent } from './subagents/research/index.js'; import type { IndicatorSubagent } from './subagents/indicator/index.js'; import type { WebExploreSubagent } from './subagents/web-explore/index.js'; import type { StrategySubagent } from './subagents/strategy/index.js'; import { BaseSubagent } from './subagents/base-subagent.js'; import type { DynamicStructuredTool } from '@langchain/core/tools'; import { getToolRegistry } from '../tools/tool-registry.js'; import type { MCPToolInfo } from '../tools/mcp/mcp-tool-wrapper.js'; import { createResearchAgentTool } from '../tools/platform/research-agent.tool.js'; import { createIndicatorAgentTool } from '../tools/platform/indicator-agent.tool.js'; import { createWebExploreAgentTool } from '../tools/platform/web-explore-agent.tool.js'; import { createStrategyAgentTool } from '../tools/platform/strategy-agent.tool.js'; import { createUserContext } from './memory/session-context.js'; import type { HarnessEvent } from './harness-events.js'; import { readFile } from 'fs/promises'; import { join, dirname } from 'path'; import { fileURLToPath } from 'url'; const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); /** * Session-specific config provided by channel handlers. * Contains only per-connection details — no infrastructure dependencies. */ export interface HarnessSessionConfig { userId: string; sessionId: string; license: License; mcpServerUrl: string; logger: FastifyBaseLogger; workspaceManager?: WorkspaceManager; channelAdapter?: ChannelAdapter; channelType?: ChannelType; channelUserId?: string; } /** * Factory function type for creating AgentHarness instances. * Created in main.ts with infrastructure (storage, providerConfig) captured in closure. * Channel handlers call this factory without knowing about Redis or Iceberg. */ export type HarnessFactory = (sessionConfig: HarnessSessionConfig) => AgentHarness; export interface AgentHarnessConfig extends HarnessSessionConfig { providerConfig: ProviderConfig; conversationStore?: ConversationStore; blobStore?: BlobStore; historyLimit: number; researchSubagent?: ResearchSubagent; indicatorSubagent?: IndicatorSubagent; webExploreSubagent?: WebExploreSubagent; strategySubagent?: StrategySubagent; } /** * Agent harness orchestrates between LLM and user's MCP server. * * This is a STATELESS orchestrator - all conversation history, RAG, and context * lives in the user's MCP server container. The harness only: * 1. Fetches context from user's MCP resources * 2. Routes to appropriate LLM model * 3. Calls LLM with embedded context * 4. Routes tool calls to platform tools or user's MCP tools * 5. Saves messages back to user's MCP */ export class AgentHarness { private static systemPromptTemplate: string | null = null; private static welcomePrompt: string | null = null; private config: AgentHarnessConfig; private modelFactory: LLMProviderFactory; private modelRouter: ModelRouter; private middleware: ModelMiddleware | undefined; private mcpClient: MCPClientConnector; private workspaceManager?: WorkspaceManager; private channelAdapter?: ChannelAdapter; private researchSubagent?: ResearchSubagent; private availableMCPTools: MCPToolInfo[] = []; private researchImageCapture: Array<{ data: string; mimeType: string }> = []; private conversationStore?: ConversationStore; private indicatorSubagent?: IndicatorSubagent; private webExploreSubagent?: WebExploreSubagent; private strategySubagent?: StrategySubagent; private blobStore?: BlobStore; private abortController: AbortController | null = null; constructor(config: AgentHarnessConfig) { this.config = config; this.workspaceManager = config.workspaceManager; this.channelAdapter = config.channelAdapter; this.researchSubagent = config.researchSubagent; this.indicatorSubagent = config.indicatorSubagent; this.webExploreSubagent = config.webExploreSubagent; this.strategySubagent = config.strategySubagent; this.modelFactory = new LLMProviderFactory(config.providerConfig, config.logger); this.modelRouter = new ModelRouter(this.modelFactory, config.logger); this.conversationStore = config.conversationStore; this.blobStore = config.blobStore; this.mcpClient = new MCPClientConnector({ userId: config.userId, mcpServerUrl: config.mcpServerUrl, logger: config.logger, }); } /** * Load system prompt template from file (cached) */ private static async loadSystemPromptTemplate(): Promise { if (!AgentHarness.systemPromptTemplate) { const templatePath = join(__dirname, 'prompts', 'system-prompt.md'); AgentHarness.systemPromptTemplate = await readFile(templatePath, 'utf-8'); } return AgentHarness.systemPromptTemplate; } /** * Load welcome prompt from file (cached) */ private static async loadWelcomePrompt(): Promise { if (!AgentHarness.welcomePrompt) { const promptPath = join(__dirname, 'prompts', 'welcome.md'); AgentHarness.welcomePrompt = (await readFile(promptPath, 'utf-8')).trim(); } return AgentHarness.welcomePrompt; } /** * Set the channel adapter (can be called after construction) */ setChannelAdapter(adapter: ChannelAdapter): void { this.channelAdapter = adapter; } interrupt(): void { this.abortController?.abort(); } /** * Initialize harness and connect to user's MCP server */ async initialize(): Promise { this.config.logger.info( { userId: this.config.userId, sessionId: this.config.sessionId }, 'Initializing agent harness' ); try { await this.mcpClient.connect(); // Discover available MCP tools from user's server await this.discoverMCPTools(); // Initialize web explore subagent first — research and indicator subagents inject it as a tool await this.initializeWebExploreSubagent(); // Initialize research subagent if not provided await this.initializeResearchSubagent(); // Initialize indicator subagent if not provided await this.initializeIndicatorSubagent(); this.config.logger.info('Agent harness initialized'); } catch (error) { this.config.logger.error({ error }, 'Failed to initialize agent harness'); throw error; } } /** * Discover available MCP tools from user's server */ private async discoverMCPTools(): Promise { try { this.config.logger.debug('Discovering MCP tools from user server'); // Call MCP client to list tools const tools = await this.mcpClient.listTools(); // Convert to MCPToolInfo format this.availableMCPTools = tools.map(tool => ({ name: tool.name, description: tool.description, inputSchema: tool.inputSchema as any, })); this.config.logger.info( { toolCount: this.availableMCPTools.length, toolNames: this.availableMCPTools.map(t => t.name), }, 'MCP tools discovered' ); } catch (error) { this.config.logger.warn( { error, errorMessage: (error as Error)?.message, errorName: (error as Error)?.name, errorCode: (error as any)?.code, }, 'Failed to discover MCP tools - continuing without remote tools' ); // Don't throw - MCP tools are optional, agent can still work with platform tools this.availableMCPTools = []; } } /** * Initialize research subagent */ private async initializeResearchSubagent(): Promise { if (this.researchSubagent) { this.config.logger.debug('Research subagent already provided'); return; } this.config.logger.debug('Creating research subagent for session'); try { const { createResearchSubagent } = await import('./subagents/research/index.js'); // Path resolution: use the compiled output path const researchSubagentPath = join(__dirname, 'subagents', 'research'); this.config.logger.debug({ researchSubagentPath }, 'Using research subagent path'); // Load the subagent config to get maxTokens — research scripts require more tokens // than the provider default (4096) because python_write arguments include full code bodies const researchSubagentConfig = await BaseSubagent.loadConfig(researchSubagentPath); // Create a model for the research subagent — always use the complex model // since research tasks involve data analysis, charting, and code generation const { model } = await this.modelRouter.route( 'analyze and backtest research data', // triggers complex routing this.config.license, RoutingStrategy.COMPLEXITY, this.config.userId, researchSubagentConfig.maxTokens // honour the subagent's maxTokens (e.g. 8192) ); // Get tools for research subagent from registry // Images from MCP responses are captured via onImage and routed to the subagent const toolRegistry = getToolRegistry(); const researchTools = await toolRegistry.getToolsForAgent( 'research', this.mcpClient, this.availableMCPTools, this.workspaceManager, (img) => this.researchImageCapture.push(img), (storeName, newState) => { this.workspaceManager?.setState(storeName, newState).catch((err) => this.config.logger.error({ err, storeName }, 'Failed to sync workspace after research mutation') ); } ); // Inject web_explore tool if the web-explore subagent is ready if (this.webExploreSubagent) { const webExploreContext = { userContext: createUserContext({ userId: this.config.userId, sessionId: this.config.sessionId, license: this.config.license, channelType: this.config.channelType ?? ChannelType.WEBSOCKET, channelUserId: this.config.channelUserId ?? this.config.userId, }), }; researchTools.push(createWebExploreAgentTool({ webExploreSubagent: this.webExploreSubagent, context: webExploreContext, logger: this.config.logger, })); } this.researchSubagent = await createResearchSubagent( model, this.config.logger, researchSubagentPath, this.mcpClient, researchTools, this.researchImageCapture ); this.config.logger.info( { toolCount: researchTools.length, toolNames: researchTools.map(t => t.name), }, 'Research subagent created successfully' ); } catch (error) { this.config.logger.error( { error, errorMessage: (error as Error).message, stack: (error as Error).stack }, 'Failed to create research subagent' ); // Don't throw - research subagent is optional } } /** * Initialize indicator subagent */ private async initializeIndicatorSubagent(): Promise { if (this.indicatorSubagent) { this.config.logger.debug('Indicator subagent already provided'); return; } this.config.logger.debug('Creating indicator subagent for session'); try { const { createIndicatorSubagent } = await import('./subagents/indicator/index.js'); const { model } = await this.modelRouter.route( 'indicator management', this.config.license, RoutingStrategy.COMPLEXITY, this.config.userId ); const toolRegistry = getToolRegistry(); const indicatorTools = await toolRegistry.getToolsForAgent( 'indicator', this.mcpClient, this.availableMCPTools, this.workspaceManager, undefined, // no image callback (storeName, newState) => { // After a workspace_patch succeeds in the container, update the gateway's // WorkspaceManager so it pushes a WebSocket patch to the web client. this.workspaceManager?.setState(storeName, newState).catch((err) => this.config.logger.error({ err, storeName }, 'Failed to sync workspace after indicator mutation') ); } ); // Inject web_explore tool if the web-explore subagent is ready if (this.webExploreSubagent) { const webExploreContext = { userContext: createUserContext({ userId: this.config.userId, sessionId: this.config.sessionId, license: this.config.license, channelType: this.config.channelType ?? ChannelType.WEBSOCKET, channelUserId: this.config.channelUserId ?? this.config.userId, }), }; indicatorTools.push(createWebExploreAgentTool({ webExploreSubagent: this.webExploreSubagent, context: webExploreContext, logger: this.config.logger, })); } const indicatorSubagentPath = join(__dirname, 'subagents', 'indicator'); this.config.logger.debug({ indicatorSubagentPath }, 'Using indicator subagent path'); this.indicatorSubagent = await createIndicatorSubagent( model, this.config.logger, indicatorSubagentPath, this.mcpClient, indicatorTools ); this.config.logger.info( { toolCount: indicatorTools.length, toolNames: indicatorTools.map(t => t.name), }, 'Indicator subagent created successfully' ); } catch (error) { this.config.logger.error( { error, errorMessage: (error as Error).message, stack: (error as Error).stack }, 'Failed to create indicator subagent' ); // Don't throw — indicator subagent is optional } } /** * Initialize web explore subagent */ private async initializeWebExploreSubagent(): Promise { if (this.webExploreSubagent) { this.config.logger.debug('Web explore subagent already provided'); return; } this.config.logger.debug('Creating web explore subagent for session'); try { const { createWebExploreSubagent } = await import('./subagents/web-explore/index.js'); const { model } = await this.modelRouter.route( 'web research and summarization', this.config.license, RoutingStrategy.COMPLEXITY, this.config.userId ); const toolRegistry = getToolRegistry(); const webExploreTools = await toolRegistry.getToolsForAgent( 'web-explore', undefined, // no MCP client needed undefined, undefined ); const webExploreSubagentPath = join(__dirname, 'subagents', 'web-explore'); this.config.logger.debug({ webExploreSubagentPath }, 'Using web explore subagent path'); this.webExploreSubagent = await createWebExploreSubagent( model, this.config.logger, webExploreSubagentPath, webExploreTools ); this.config.logger.info( { toolCount: webExploreTools.length, toolNames: webExploreTools.map(t => t.name), }, 'Web explore subagent created successfully' ); } catch (error) { this.config.logger.error( { error, errorMessage: (error as Error).message, stack: (error as Error).stack }, 'Failed to create web explore subagent' ); // Don't throw — web explore subagent is optional } } /** * Initialize strategy subagent */ private async initializeStrategySubagent(): Promise { if (this.strategySubagent) { this.config.logger.debug('Strategy subagent already provided'); return; } this.config.logger.debug('Creating strategy subagent for session'); try { const { createStrategySubagent } = await import('./subagents/strategy/index.js'); const { model } = await this.modelRouter.route( 'trading strategy writing and backtesting', this.config.license, RoutingStrategy.COMPLEXITY, this.config.userId ); const toolRegistry = getToolRegistry(); const strategyTools = await toolRegistry.getToolsForAgent( 'strategy', this.mcpClient, this.availableMCPTools, this.workspaceManager, undefined, (storeName, newState) => { this.workspaceManager?.setState(storeName, newState).catch((err) => this.config.logger.error({ err, storeName }, 'Failed to sync workspace after strategy mutation') ); } ); const strategySubagentPath = join(__dirname, 'subagents', 'strategy'); this.config.logger.debug({ strategySubagentPath }, 'Using strategy subagent path'); this.strategySubagent = await createStrategySubagent( model, this.config.logger, strategySubagentPath, this.mcpClient, strategyTools ); this.config.logger.info( { toolCount: strategyTools.length, toolNames: strategyTools.map(t => t.name), }, 'Strategy subagent created successfully' ); } catch (error) { this.config.logger.error( { error, errorMessage: (error as Error).message, stack: (error as Error).stack }, 'Failed to create strategy subagent' ); // Don't throw — strategy subagent is optional } } /** * Execute model with tool calling loop * Handles multi-turn tool calls until the model produces a final text response */ private async *executeWithToolCalling( model: any, messages: BaseMessage[], tools: DynamicStructuredTool[], maxIterations: number = 2, signal?: AbortSignal ): AsyncGenerator { this.config.logger.info( { toolCount: tools.length, maxIterations }, 'Starting tool calling loop' ); const messagesCopy = [...messages]; let iterations = 0; while (iterations < maxIterations) { if (signal?.aborted) break; iterations++; this.config.logger.info( { iteration: iterations, messageCount: messagesCopy.length, lastMessageType: messagesCopy[messagesCopy.length - 1]?.constructor.name, }, 'Tool calling loop iteration' ); this.config.logger.debug('Streaming model response...'); let response: any = null; try { const stream = await model.stream(messagesCopy, { signal }); for await (const chunk of stream) { if (typeof chunk.content === 'string' && chunk.content.length > 0) { this.config.logger.trace({ content: chunk.content }, 'raw chunk'); yield { type: 'chunk', content: chunk.content }; } else if (Array.isArray(chunk.content)) { for (const block of chunk.content) { if (block.type === 'text' && block.text) { this.config.logger.trace({ content: block.text }, 'raw chunk'); yield { type: 'chunk', content: block.text }; } } } response = response ? response.concat(chunk) : chunk; } } catch (invokeError: any) { this.config.logger.error( { error: invokeError, errorMessage: invokeError?.message, errorStack: invokeError?.stack, iteration: iterations, messageCount: messagesCopy.length, }, 'Model streaming failed in tool calling loop' ); throw invokeError; } this.config.logger.info( { hasContent: !!response.content, contentLength: typeof response.content === 'string' ? response.content.length : 0, hasToolCalls: !!response.tool_calls, toolCallCount: response.tool_calls?.length || 0, usageMetadata: (response as any).usage_metadata, finishReason: (response as any).response_metadata?.finish_reason, }, 'Model response received' ); // Check if model wants to call tools if (!response.tool_calls || response.tool_calls.length === 0) { // No tool calls - return final response let finalContent: string; if (typeof response.content === 'string') { finalContent = response.content; } else if (Array.isArray(response.content)) { finalContent = response.content .filter((block: any) => block.type === 'text') .map((block: any) => block.text || '') .join(''); } else { finalContent = JSON.stringify(response.content); } this.config.logger.info( { finalContentLength: finalContent.length, iterations }, 'Tool calling loop complete - no more tool calls' ); yield { type: 'done', content: finalContent }; return; } this.config.logger.info( { toolCalls: response.tool_calls.map((tc: any) => tc.name) }, 'Processing tool calls' ); // Add assistant message with tool calls to history messagesCopy.push(response); // Execute each tool call for (const toolCall of response.tool_calls) { this.config.logger.info( { tool: toolCall.name, args: toolCall.args }, 'Executing tool call' ); const tool = tools.find(t => t.name === toolCall.name); if (!tool) { this.config.logger.warn({ tool: toolCall.name }, 'Tool not found'); messagesCopy.push( new ToolMessage({ content: `Error: Tool '${toolCall.name}' not found`, tool_call_id: toolCall.id, }) ); continue; } try { yield { type: 'tool_call', toolName: toolCall.name, label: this.getToolLabel(toolCall.name) }; // Use streamFunc when available (subagent tools) to forward intermediate events inline let result: string; const streamFunc = (tool as any).streamFunc as ((args: any, signal?: AbortSignal) => AsyncGenerator) | undefined; if (streamFunc) { const gen = streamFunc(toolCall.args, signal); let next = await gen.next(); while (!next.done) { if (signal?.aborted) { gen.return?.(''); break; } yield next.value; next = await gen.next(); } result = next.done ? next.value : ''; } else { result = await tool.func(toolCall.args); } // Extract images from result and yield them; get text-only version for LLM const { cleanedResult: processedResult, images } = this.extractImagesFromToolResult(result, toolCall.name); for (const img of images) { yield { type: 'image', data: img.data, mimeType: img.mimeType, caption: img.caption }; } this.config.logger.debug( { tool: toolCall.name, originalResultLength: result.length, processedResultLength: processedResult.length, }, 'Tool result processed' ); messagesCopy.push( new ToolMessage({ content: processedResult, tool_call_id: toolCall.id, }) ); this.config.logger.info( { tool: toolCall.name, resultLength: processedResult.length }, 'Tool execution completed' ); } catch (error) { // Clean stop — abort signal fired during tool execution; exit without error message if (signal?.aborted || (error as Error)?.name === 'AbortError') { this.config.logger.info({ tool: toolCall.name }, 'Tool execution aborted by stop signal'); return; } this.config.logger.error( { error, errorMessage: (error as Error)?.message, errorStack: (error as Error)?.stack, tool: toolCall.name, args: toolCall.args, }, 'Tool execution failed' ); yield { type: 'error' as const, source: toolCall.name, fatal: false }; messagesCopy.push( new ToolMessage({ content: `Error: ${error}`, tool_call_id: toolCall.id, }) ); } } // After all tool calls complete, emit a space separator before the next LLM streaming pass yield { type: 'chunk', content: ' ' }; } // Max iterations reached - yield done with apology this.config.logger.warn('Max tool calling iterations reached'); yield { type: 'done', content: 'I apologize, but I encountered an issue processing your request. Please try rephrasing your question.' }; } /** * Call a tool on the user's MCP server directly (bypasses the agent/LLM). * Used by channel handlers for direct data requests (e.g. evaluate_indicator). */ async callMcpTool(name: string, args: Record): Promise { return this.mcpClient.callTool(name, args); } /** * Expose MCP client so channel handlers can wire ContainerSync after harness init. */ getMcpClient(): MCPClientConnector { return this.mcpClient; } /** * Set workspace manager after construction (used when ContainerSync requires MCP to be connected first). */ setWorkspaceManager(workspace: WorkspaceManager): void { this.workspaceManager = workspace; } /** * Stream events for an incoming user message. * Yields typed HarnessEvents (chunk, tool_call, image, done) and saves the * conversation to the store once the done event has been emitted. */ async *streamMessage(message: InboundMessage, options?: { saveUserMessage?: boolean }): AsyncGenerator { this.config.logger.info( { messageId: message.messageId, userId: message.userId, content: message.content.substring(0, 100) }, 'Processing user message' ); // 1. Build system prompt from template this.config.logger.debug('Building system prompt'); const systemPrompt = await this.buildSystemPrompt(); this.config.logger.debug({ systemPromptLength: systemPrompt.length }, 'System prompt built'); // 2. Load recent conversation history const channelKey = this.config.channelType ?? ChannelType.WEBSOCKET; let storedMessages = this.conversationStore ? await this.conversationStore.getFullHistory( this.config.userId, this.config.sessionId, this.config.historyLimit, channelKey ) : []; const history = this.conversationStore ? this.conversationStore.toLangChainMessages(storedMessages) : []; this.config.logger.debug({ historyLength: history.length }, 'Conversation history loaded'); // Inject current workspace state fresh on every turn — not persisted to conversation history const workspaceContext = this.workspaceManager ? `[Workspace State]\n\`\`\`json\n${this.workspaceManager.serializeState()}\n\`\`\`` : undefined; // 4. Get the configured model this.config.logger.debug('Routing to model'); const { model, middleware } = await this.modelRouter.route( message.content, this.config.license, RoutingStrategy.COMPLEXITY, this.config.userId ); this.middleware = middleware; this.config.logger.info({ modelName: model.constructor.name }, 'Model selected'); // 5. Build LangChain messages const langchainMessages = this.buildLangChainMessages(systemPrompt, history, workspaceContext, message.content); this.config.logger.debug({ messageCount: langchainMessages.length }, 'LangChain messages built'); // 6. Get tools for main agent from registry const toolRegistry = getToolRegistry(); const tools = await toolRegistry.getToolsForAgent( 'main', this.mcpClient, this.availableMCPTools, this.workspaceManager ); // Build shared subagent context const subagentContext = { userContext: createUserContext({ userId: this.config.userId, sessionId: this.config.sessionId, license: this.config.license, channelType: this.config.channelType ?? ChannelType.WEBSOCKET, channelUserId: this.config.channelUserId ?? this.config.userId, }), }; if (this.researchSubagent) { tools.push(createResearchAgentTool({ researchSubagent: this.researchSubagent, context: subagentContext, logger: this.config.logger, })); } if (this.indicatorSubagent) { tools.push(createIndicatorAgentTool({ indicatorSubagent: this.indicatorSubagent, context: subagentContext, logger: this.config.logger, })); } if (this.webExploreSubagent) { tools.push(createWebExploreAgentTool({ webExploreSubagent: this.webExploreSubagent, context: subagentContext, logger: this.config.logger, })); } if (!this.strategySubagent) { await this.initializeStrategySubagent(); } if (this.strategySubagent) { tools.push(createStrategyAgentTool({ strategySubagent: this.strategySubagent, context: subagentContext, logger: this.config.logger, })); } this.config.logger.info( { toolCount: tools.length, toolNames: tools.map(t => t.name) }, 'Tools loaded for main agent' ); // Apply middleware (e.g. Anthropic prompt caching) const processedMessages = this.middleware ? this.middleware.processMessages(langchainMessages, tools) : langchainMessages; // 7. Bind tools to model const modelWithTools = tools.length > 0 && model.bindTools ? model.bindTools(tools) : model; if (tools.length > 0) { this.config.logger.info( { modelType: modelWithTools.constructor.name, toolsBound: tools.length > 0 && !!model.bindTools }, 'Model bound with tools' ); } // 8. Stream tool calling loop and save conversation on completion this.config.logger.info('Invoking LLM with tool support'); this.abortController = new AbortController(); let finalContent = ''; const collectedImages: Array<{ data: string; mimeType: string; caption?: string }> = []; try { for await (const event of this.executeWithToolCalling(modelWithTools, processedMessages, tools, 10, this.abortController.signal)) { if (event.type === 'done') { finalContent = event.content; this.config.logger.info({ responseLength: finalContent.length }, 'LLM response received'); } else if (event.type === 'image') { collectedImages.push({ data: event.data, mimeType: event.mimeType, caption: event.caption }); } yield event; } } catch (error) { if ((error as Error)?.name === 'AbortError') { this.config.logger.info('Agent harness interrupted by stop signal'); } else { this.config.logger.error({ error }, 'Fatal error in agent harness'); yield { type: 'error' as const, source: 'agent harness', fatal: true }; } } finally { this.abortController = null; if (finalContent && this.conversationStore) { // Write blobs to S3 and capture their IDs for message metadata let blobRefs: Array<{ id: string; mimeType: string; caption?: string }> = []; if (collectedImages.length > 0 && this.blobStore) { const assistantMsgId = `${this.config.userId}:${this.config.sessionId}:${Date.now()}`; const blobIds = await this.blobStore.writeBlobs( this.config.userId, this.config.sessionId, assistantMsgId, collectedImages.map(img => ({ blobType: 'image' as const, mimeType: img.mimeType, data: img.data, caption: img.caption })) ); blobRefs = blobIds.map((id, i) => ({ id, mimeType: collectedImages[i].mimeType, caption: collectedImages[i].caption })); } if (options?.saveUserMessage !== false) { await this.conversationStore.saveMessage( this.config.userId, this.config.sessionId, 'user', message.content, undefined, channelKey ); } await this.conversationStore.saveMessage( this.config.userId, this.config.sessionId, 'assistant', finalContent, blobRefs.length > 0 ? { blobs: blobRefs } : undefined, channelKey ); } } } /** * Stream a greeting response for first-time users. * Sends "Who are you and what can you do?" through the normal message pipeline. */ async *streamGreeting(): AsyncGenerator { const content = await AgentHarness.loadWelcomePrompt(); const greetingMessage: InboundMessage = { messageId: `greeting_${Date.now()}`, userId: this.config.userId, sessionId: this.config.sessionId, content, timestamp: new Date(), }; yield* this.streamMessage(greetingMessage, { saveUserMessage: false }); } /** * Handle incoming message from user. * Consumes streamMessage and dispatches events to the channel adapter for * backward compatibility with Telegram and other non-streaming callers. */ async handleMessage(message: InboundMessage): Promise { let finalContent = ''; try { for await (const event of this.streamMessage(message)) { switch (event.type) { case 'chunk': this.channelAdapter?.sendChunk(event.content); break; case 'tool_call': this.channelAdapter?.sendToolCall?.(event.toolName, event.label); break; case 'image': this.channelAdapter?.sendImage({ data: event.data, mimeType: event.mimeType, caption: event.caption }); break; case 'error': this.channelAdapter?.sendText?.({ text: `An unrecoverable error occurred in the ${event.source}.` }); break; case 'done': finalContent = event.content; break; } } } catch (error) { this.config.logger.error({ error }, 'Error processing message'); throw error; } return { messageId: `msg_${Date.now()}`, sessionId: message.sessionId, content: finalContent, timestamp: new Date(), }; } /** * Convert to LangChain message format */ private buildLangChainMessages( systemPrompt: string, history: BaseMessage[], workspaceContext: string | undefined, currentUserMessage: string ): BaseMessage[] { return [ new SystemMessage(systemPrompt), ...history, ...(workspaceContext ? [new HumanMessage(workspaceContext)] : []), new HumanMessage(currentUserMessage), ]; } /** * Build system prompt from template */ private async buildSystemPrompt(): Promise { // Load template and populate with license info const template = await AgentHarness.loadSystemPromptTemplate(); const prompt = template .replace('{{licenseType}}', this.config.license.licenseType) .replace('{{features}}', JSON.stringify(this.config.license.features, null, 2)); return prompt; } /** * Map tool names to user-friendly status labels. */ private getToolLabel(toolName: string): string { const labels: Record = { research: 'Researching...', indicator: 'Adjusting indicators...', get_chart_data: 'Fetching chart data...', symbol_lookup: 'Searching symbol...', python_list: 'Seeing what we have...', python_edit: 'Coding...', python_write: 'Coding...', python_read: 'Inspecting...', execute_research: 'Running script...', backtest_strategy: 'Backtesting...', list_active_strategies: 'Checking active strategies...', web_explore: 'Searching the web...', strategy: 'Coding a strategy...', }; return labels[toolName] ?? `Running ${toolName} tool...`; } /** * Process tool result to extract images and send via channel adapter. * Returns text-only version for LLM context (no base64 image data). */ private extractImagesFromToolResult( result: string, toolName: string ): { cleanedResult: string; images: Array<{ data: string; mimeType: string; caption?: string }> } { const noImages = { cleanedResult: String(result || ''), images: [] }; // Most tools return plain strings - only process JSON results if (!result || typeof result !== 'string') { return noImages; } // Try to parse as JSON let parsedResult: any; try { parsedResult = JSON.parse(result); } catch { // Not JSON, return as-is return noImages; } // Check if result has images array (from ResearchSubagent) if (parsedResult && Array.isArray(parsedResult.images) && parsedResult.images.length > 0) { this.config.logger.info( { tool: toolName, imageCount: parsedResult.images.length }, 'Extracting images from tool result' ); const images: Array<{ data: string; mimeType: string; caption?: string }> = []; for (const image of parsedResult.images) { if (image.data && image.mimeType) { this.config.logger.debug({ mimeType: image.mimeType }, 'Extracted image from tool result'); images.push({ data: image.data, mimeType: image.mimeType, caption: undefined }); } } // Create text-only version for LLM const textOnlyResult = { ...parsedResult, images: undefined, imageCount: parsedResult.images.length, }; Object.keys(textOnlyResult).forEach(key => { if (textOnlyResult[key] === undefined) { delete textOnlyResult[key]; } }); return { cleanedResult: JSON.stringify(textOnlyResult), images }; } // Check for nested chart_images object if (parsedResult && parsedResult.chart_images && typeof parsedResult.chart_images === 'object') { this.config.logger.info( { tool: toolName, chartCount: Object.keys(parsedResult.chart_images).length }, 'Extracting chart images from tool result' ); const images: Array<{ data: string; mimeType: string; caption?: string }> = []; for (const [chartId, chartData] of Object.entries(parsedResult.chart_images)) { const chart = chartData as any; if (chart.type === 'image' && chart.data) { this.config.logger.debug({ chartId }, 'Extracted chart image from tool result'); images.push({ data: chart.data, mimeType: 'image/png', caption: undefined }); } } // Create text-only version for LLM const textOnlyResult = { ...parsedResult, chart_images: undefined, chartCount: Object.keys(parsedResult.chart_images).length, }; Object.keys(textOnlyResult).forEach(key => { if (textOnlyResult[key] === undefined) { delete textOnlyResult[key]; } }); return { cleanedResult: JSON.stringify(textOnlyResult), images }; } // No images found, return as-is return { cleanedResult: result, images: [] }; } /** * End the session: flush conversation to cold storage, then release resources. * Called by channel handlers on disconnect, session expiry, or graceful shutdown. */ async cleanup(): Promise { this.config.logger.info('Cleaning up agent harness'); if (this.conversationStore) { const channelKey = this.config.channelType ?? ChannelType.WEBSOCKET; try { await this.conversationStore.flushToIceberg( this.config.userId, this.config.sessionId, this.config.historyLimit, channelKey ); } catch (error) { this.config.logger.error({ error }, 'Failed to flush conversation to Iceberg during cleanup'); } } await this.mcpClient.disconnect(); } }