major agent refactoring: wiki knowledge base, no RAG, no Qdrant, no Ollama
This commit is contained in:
222
gateway/src/harness/spawn/spawn-service.ts
Normal file
222
gateway/src/harness/spawn/spawn-service.ts
Normal file
@@ -0,0 +1,222 @@
|
||||
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
|
||||
import { SystemMessage, HumanMessage } from '@langchain/core/messages';
|
||||
|
||||
/** All platform tool names available to every subagent. */
|
||||
const ALL_PLATFORM_TOOLS = ['SymbolLookup', 'GetChartData', 'WebSearch', 'FetchPage', 'ArxivSearch'];
|
||||
import type { FastifyBaseLogger } from 'fastify';
|
||||
import { createReactAgent } from '@langchain/langgraph/prebuilt';
|
||||
import type { HarnessEvent, SubagentChunkEvent, SubagentThinkingEvent } from '../harness-events.js';
|
||||
import { getToolLabel } from '../tool-labels.js';
|
||||
import type { MCPClientConnector } from '../mcp-client.js';
|
||||
import type { WorkspaceManager } from '../../workspace/workspace-manager.js';
|
||||
import type { ToolRegistry } from '../../tools/tool-registry.js';
|
||||
import type { MCPToolInfo } from '../../tools/mcp/mcp-tool-wrapper.js';
|
||||
import { WikiLoader, type SpawnContext } from './wiki-loader.js';
|
||||
|
||||
export interface SpawnInput {
|
||||
agentName: string;
|
||||
instruction: string;
|
||||
mcpClient?: MCPClientConnector;
|
||||
availableMCPTools?: MCPToolInfo[];
|
||||
workspaceManager?: WorkspaceManager;
|
||||
signal?: AbortSignal;
|
||||
}
|
||||
|
||||
/**
|
||||
* SpawnService creates isolated subagent invocations on demand.
|
||||
*
|
||||
* Each call to streamSpawn():
|
||||
* 1. Loads the agent's wiki page (frontmatter + body)
|
||||
* 2. Builds a SystemMessage from base prompt + agent body + static imports
|
||||
* 3. Loads dynamic imports as a HumanMessage prefix (never cached)
|
||||
* 4. Resolves tools from the frontmatter tool lists
|
||||
* 5. Creates a fresh createReactAgent and streams events
|
||||
*
|
||||
* This replaces the old per-subagent BaseSubagent pattern with a stateless
|
||||
* factory that reads configuration from markdown frontmatter.
|
||||
*/
|
||||
export class SpawnService {
|
||||
constructor(
|
||||
private readonly wikiLoader: WikiLoader,
|
||||
private readonly toolRegistry: ToolRegistry,
|
||||
private readonly modelFn: (maxTokens?: number) => Promise<BaseChatModel>,
|
||||
private readonly logger: FastifyBaseLogger,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Stream events from a subagent invocation.
|
||||
* Yields HarnessEvents (subagent_chunk, subagent_thinking, subagent_tool_call).
|
||||
* Returns the final text result (or JSON with images when spawnsImages is set).
|
||||
*/
|
||||
async *streamSpawn(input: SpawnInput): AsyncGenerator<HarnessEvent, string> {
|
||||
const { agentName, instruction, mcpClient, availableMCPTools, workspaceManager, signal } = input;
|
||||
|
||||
this.logger.info({ agentName, instruction: instruction.substring(0, 100) }, 'SpawnService: starting');
|
||||
|
||||
// Load agent wiki page
|
||||
const agentPage = await this.wikiLoader.loadAgentPage(agentName);
|
||||
const fm = agentPage.frontmatter;
|
||||
|
||||
// Build SpawnContext for virtual pages
|
||||
const ctx: SpawnContext = { mcpClient, workspaceManager };
|
||||
|
||||
// Load base prompt (index.md + tools.md) — stable, tier-1 cacheable
|
||||
const basePrompt = await this.wikiLoader.getBasePrompt();
|
||||
|
||||
// Load static imports (appended to agent body, tier-2 cacheable together)
|
||||
const staticImports = fm.static_imports?.length
|
||||
? await this.wikiLoader.loadStaticImports(fm.static_imports)
|
||||
: '';
|
||||
|
||||
// Build the static SystemMessage (base + agent body + static imports)
|
||||
const staticContent = [basePrompt, agentPage.body, staticImports]
|
||||
.filter(Boolean)
|
||||
.join('\n\n---\n\n');
|
||||
const systemMessage = new SystemMessage(staticContent);
|
||||
|
||||
// Load dynamic imports (never cached, injected as a HumanMessage prefix)
|
||||
const dynamicContent = fm.dynamic_imports?.length
|
||||
? await this.wikiLoader.loadDynamicImports(fm.dynamic_imports, ctx)
|
||||
: '';
|
||||
|
||||
// Build HumanMessage: dynamic context (if any) + instruction
|
||||
const humanContent = dynamicContent
|
||||
? `${dynamicContent}\n\n---\n\n${instruction}`
|
||||
: instruction;
|
||||
const humanMessage = new HumanMessage(humanContent);
|
||||
|
||||
// Set up image capture array (per-call, not shared mutable state)
|
||||
const imageCapture: Array<{ data: string; mimeType: string }> = [];
|
||||
|
||||
const onImage = fm.spawnsImages
|
||||
? (img: { data: string; mimeType: string }) => imageCapture.push(img)
|
||||
: undefined;
|
||||
|
||||
const onWorkspaceMutation = workspaceManager
|
||||
? (storeName: string, newState: unknown) => {
|
||||
workspaceManager.setState(storeName, newState).catch((err: Error) => {
|
||||
this.logger.error({ err, storeName }, 'Failed to sync workspace after spawn mutation');
|
||||
});
|
||||
}
|
||||
: undefined;
|
||||
|
||||
// All subagents get all platform tools and all MCP tools.
|
||||
// Per-agent tool restrictions via frontmatter are no longer used.
|
||||
const tools = await this.toolRegistry.resolveTools(
|
||||
ALL_PLATFORM_TOOLS,
|
||||
['*'],
|
||||
mcpClient,
|
||||
availableMCPTools,
|
||||
workspaceManager,
|
||||
onImage,
|
||||
onWorkspaceMutation,
|
||||
);
|
||||
|
||||
this.logger.info(
|
||||
{ agentName, toolCount: tools.length, toolNames: tools.map(t => t.name) },
|
||||
'SpawnService: tools resolved'
|
||||
);
|
||||
|
||||
// Create model (respecting per-agent maxTokens)
|
||||
const model = await this.modelFn(fm.maxTokens);
|
||||
|
||||
// Create a fresh ReactAgent for this invocation
|
||||
const agent = createReactAgent({
|
||||
llm: model,
|
||||
tools,
|
||||
prompt: systemMessage,
|
||||
});
|
||||
|
||||
const recursionLimit = fm.recursionLimit ?? 30;
|
||||
|
||||
// Emit an initial indicator so the UI shows the subagent has started
|
||||
yield { type: 'subagent_tool_call', agentName, toolName: 'Thinking...', label: 'Thinking...' };
|
||||
|
||||
const stream = agent.stream(
|
||||
{ messages: [humanMessage] },
|
||||
{ streamMode: ['messages', 'updates'], recursionLimit, signal }
|
||||
);
|
||||
|
||||
let finalText = '';
|
||||
|
||||
for await (const [mode, data] of await stream) {
|
||||
if (signal?.aborted) break;
|
||||
|
||||
if (mode === 'messages') {
|
||||
for (const chunk of SpawnService.extractStreamChunks(data, agentName)) {
|
||||
yield chunk;
|
||||
}
|
||||
} else if (mode === 'updates') {
|
||||
if ((data as any).agent?.messages) {
|
||||
for (const msg of (data as any).agent.messages as any[]) {
|
||||
if (msg.tool_calls?.length) {
|
||||
for (const tc of msg.tool_calls) {
|
||||
yield {
|
||||
type: 'subagent_tool_call',
|
||||
agentName,
|
||||
toolName: tc.name,
|
||||
label: getToolLabel(tc.name),
|
||||
};
|
||||
}
|
||||
} else {
|
||||
const content = SpawnService.extractFinalText(msg);
|
||||
if (content) finalText = content;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.info(
|
||||
{ agentName, textLength: finalText.length, imageCount: imageCapture.length },
|
||||
'SpawnService: finished'
|
||||
);
|
||||
|
||||
// If this agent captures images, return JSON with text + images
|
||||
if (fm.spawnsImages && imageCapture.length > 0) {
|
||||
return JSON.stringify({ text: finalText, images: imageCapture });
|
||||
}
|
||||
|
||||
return finalText;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract subagent_chunk / subagent_thinking events from a LangGraph `messages` stream datum.
|
||||
*/
|
||||
static extractStreamChunks(
|
||||
data: unknown,
|
||||
agentName: string,
|
||||
): Array<SubagentChunkEvent | SubagentThinkingEvent> {
|
||||
const msg = Array.isArray(data) ? (data as unknown[])[0] : data;
|
||||
const content = (msg as any)?.content;
|
||||
if (typeof content === 'string') {
|
||||
return content ? [{ type: 'subagent_chunk', agentName, content }] : [];
|
||||
}
|
||||
if (Array.isArray(content)) {
|
||||
const chunks: Array<SubagentChunkEvent | SubagentThinkingEvent> = [];
|
||||
for (const block of content as any[]) {
|
||||
if (block?.type === 'thinking' && typeof block.thinking === 'string' && block.thinking) {
|
||||
chunks.push({ type: 'subagent_thinking', agentName, content: block.thinking });
|
||||
} else if (block?.type === 'text' && typeof block.text === 'string' && block.text) {
|
||||
chunks.push({ type: 'subagent_chunk', agentName, content: block.text });
|
||||
}
|
||||
}
|
||||
return chunks;
|
||||
}
|
||||
return [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the final text from an `updates`-mode agent message.
|
||||
*/
|
||||
static extractFinalText(msg: any): string {
|
||||
if (typeof msg?.content === 'string') return msg.content;
|
||||
if (Array.isArray(msg?.content)) {
|
||||
return (msg.content as any[])
|
||||
.filter((b: any) => b?.type === 'text' && typeof b.text === 'string')
|
||||
.map((b: any) => b.text as string)
|
||||
.join('');
|
||||
}
|
||||
return '';
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user