357 lines
9.5 KiB
TypeScript
357 lines
9.5 KiB
TypeScript
import { readdir, readFile } from 'fs/promises';
|
|
import { join, relative } from 'path';
|
|
import { createHash } from 'crypto';
|
|
import type { FastifyBaseLogger } from 'fastify';
|
|
import { RAGRetriever } from './rag-retriever.js';
|
|
import { EmbeddingService } from './embedding-service.js';
|
|
|
|
/**
|
|
* Document metadata stored with each chunk
|
|
*/
|
|
export interface DocumentMetadata {
|
|
document_id: string;
|
|
chunk_index: number;
|
|
content_hash: string;
|
|
last_updated: number;
|
|
tags: string[];
|
|
heading?: string;
|
|
file_path: string;
|
|
}
|
|
|
|
/**
|
|
* Document chunk with content and metadata
|
|
*/
|
|
export interface DocumentChunk {
|
|
content: string;
|
|
metadata: DocumentMetadata;
|
|
}
|
|
|
|
/**
|
|
* Document loader configuration
|
|
*/
|
|
export interface DocumentLoaderConfig {
|
|
knowledgeDir: string;
|
|
maxChunkSize?: number; // in tokens (approximate by chars)
|
|
chunkOverlap?: number; // overlap between chunks
|
|
}
|
|
|
|
/**
|
|
* Global knowledge document loader
|
|
*
|
|
* Loads markdown documents from a directory structure and stores them
|
|
* as global knowledge (user_id="0") in Qdrant for RAG retrieval.
|
|
*
|
|
* Features:
|
|
* - Intelligent chunking by markdown headers
|
|
* - Content hashing for change detection
|
|
* - Metadata extraction (tags, headings)
|
|
* - Automatic embedding generation
|
|
* - Incremental updates (only changed docs)
|
|
*
|
|
* Directory structure:
|
|
* gateway/knowledge/
|
|
* platform/
|
|
* trading/
|
|
* indicators/
|
|
* strategies/
|
|
*/
|
|
export class DocumentLoader {
|
|
private config: DocumentLoaderConfig;
|
|
private logger: FastifyBaseLogger;
|
|
private embeddings: EmbeddingService;
|
|
private rag: RAGRetriever;
|
|
private loadedDocs: Map<string, string> = new Map(); // path -> hash
|
|
|
|
constructor(
|
|
config: DocumentLoaderConfig,
|
|
embeddings: EmbeddingService,
|
|
rag: RAGRetriever,
|
|
logger: FastifyBaseLogger
|
|
) {
|
|
this.config = {
|
|
maxChunkSize: 4000, // ~1000 tokens
|
|
chunkOverlap: 200,
|
|
...config,
|
|
};
|
|
this.embeddings = embeddings;
|
|
this.rag = rag;
|
|
this.logger = logger;
|
|
}
|
|
|
|
/**
|
|
* Load all documents from knowledge directory
|
|
*/
|
|
async loadAll(): Promise<{ loaded: number; updated: number; skipped: number }> {
|
|
this.logger.info({ dir: this.config.knowledgeDir }, 'Loading knowledge documents');
|
|
|
|
const stats = { loaded: 0, updated: 0, skipped: 0 };
|
|
|
|
try {
|
|
const files = await this.findMarkdownFiles(this.config.knowledgeDir);
|
|
|
|
for (const filePath of files) {
|
|
const result = await this.loadDocument(filePath);
|
|
|
|
if (result === 'loaded') stats.loaded++;
|
|
else if (result === 'updated') stats.updated++;
|
|
else stats.skipped++;
|
|
}
|
|
|
|
this.logger.info(stats, 'Knowledge documents loaded');
|
|
return stats;
|
|
} catch (error) {
|
|
this.logger.error({ error }, 'Failed to load knowledge documents');
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Load a single document
|
|
*/
|
|
async loadDocument(filePath: string): Promise<'loaded' | 'updated' | 'skipped'> {
|
|
try {
|
|
// Read file content
|
|
const content = await readFile(filePath, 'utf-8');
|
|
const contentHash = this.hashContent(content);
|
|
|
|
// Check if document has changed
|
|
const relativePath = relative(this.config.knowledgeDir, filePath);
|
|
const existingHash = this.loadedDocs.get(relativePath);
|
|
|
|
if (existingHash === contentHash) {
|
|
this.logger.debug({ file: relativePath }, 'Document unchanged, skipping');
|
|
return 'skipped';
|
|
}
|
|
|
|
const isUpdate = !!existingHash;
|
|
|
|
// Parse and chunk document
|
|
const chunks = this.chunkDocument(content, relativePath);
|
|
|
|
this.logger.info(
|
|
{ file: relativePath, chunks: chunks.length, update: isUpdate },
|
|
'Processing document'
|
|
);
|
|
|
|
// Generate embeddings and store chunks
|
|
for (const chunk of chunks) {
|
|
const embedding = await this.embeddings.embed(chunk.content);
|
|
|
|
// Create unique ID for this chunk
|
|
const chunkId = `global:${chunk.metadata.document_id}:${chunk.metadata.chunk_index}`;
|
|
|
|
// Store in Qdrant as global knowledge
|
|
await this.rag.storeGlobalKnowledge(
|
|
chunkId,
|
|
chunk.content,
|
|
embedding,
|
|
{
|
|
...chunk.metadata,
|
|
type: 'knowledge_doc',
|
|
}
|
|
);
|
|
}
|
|
|
|
// Update loaded docs tracking
|
|
this.loadedDocs.set(relativePath, contentHash);
|
|
|
|
return isUpdate ? 'updated' : 'loaded';
|
|
} catch (error) {
|
|
this.logger.error({ error, file: filePath }, 'Failed to load document');
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Reload a specific document (for updates)
|
|
*/
|
|
async reloadDocument(filePath: string): Promise<void> {
|
|
this.logger.info({ file: filePath }, 'Reloading document');
|
|
await this.loadDocument(filePath);
|
|
}
|
|
|
|
/**
|
|
* Chunk document by markdown headers with smart splitting
|
|
*/
|
|
private chunkDocument(content: string, documentId: string): DocumentChunk[] {
|
|
const chunks: DocumentChunk[] = [];
|
|
const tags = this.extractTags(content);
|
|
const lastModified = Date.now();
|
|
|
|
// Split by headers (## or ###)
|
|
const sections = this.splitByHeaders(content);
|
|
|
|
let chunkIndex = 0;
|
|
|
|
for (const section of sections) {
|
|
// If section is too large, split it further
|
|
const subChunks = this.splitLargeSection(section.content);
|
|
|
|
for (const subContent of subChunks) {
|
|
if (subContent.trim().length === 0) continue;
|
|
|
|
chunks.push({
|
|
content: subContent,
|
|
metadata: {
|
|
document_id: documentId,
|
|
chunk_index: chunkIndex++,
|
|
content_hash: this.hashContent(content),
|
|
last_updated: lastModified,
|
|
tags,
|
|
heading: section.heading,
|
|
file_path: documentId,
|
|
},
|
|
});
|
|
}
|
|
}
|
|
|
|
return chunks;
|
|
}
|
|
|
|
/**
|
|
* Split document by markdown headers
|
|
*/
|
|
private splitByHeaders(content: string): Array<{ heading?: string; content: string }> {
|
|
const lines = content.split('\n');
|
|
const sections: Array<{ heading?: string; content: string }> = [];
|
|
let currentSection: string[] = [];
|
|
let currentHeading: string | undefined;
|
|
|
|
for (const line of lines) {
|
|
// Check for markdown header (##, ###, ####)
|
|
const headerMatch = line.match(/^(#{2,4})\s+(.+)$/);
|
|
|
|
if (headerMatch) {
|
|
// Save previous section
|
|
if (currentSection.length > 0) {
|
|
sections.push({
|
|
heading: currentHeading,
|
|
content: currentSection.join('\n'),
|
|
});
|
|
}
|
|
|
|
// Start new section
|
|
currentHeading = headerMatch[2].trim();
|
|
currentSection = [line];
|
|
} else {
|
|
currentSection.push(line);
|
|
}
|
|
}
|
|
|
|
// Add final section
|
|
if (currentSection.length > 0) {
|
|
sections.push({
|
|
heading: currentHeading,
|
|
content: currentSection.join('\n'),
|
|
});
|
|
}
|
|
|
|
return sections;
|
|
}
|
|
|
|
/**
|
|
* Split large sections into smaller chunks
|
|
*/
|
|
private splitLargeSection(content: string): string[] {
|
|
const maxSize = this.config.maxChunkSize!;
|
|
const overlap = this.config.chunkOverlap!;
|
|
|
|
if (content.length <= maxSize) {
|
|
return [content];
|
|
}
|
|
|
|
const chunks: string[] = [];
|
|
let start = 0;
|
|
|
|
while (start < content.length) {
|
|
const end = Math.min(start + maxSize, content.length);
|
|
let chunkEnd = end;
|
|
|
|
// Try to break at sentence boundary
|
|
if (end < content.length) {
|
|
const sentenceEnd = content.lastIndexOf('.', end);
|
|
const paragraphEnd = content.lastIndexOf('\n\n', end);
|
|
|
|
if (paragraphEnd > start + maxSize / 2) {
|
|
chunkEnd = paragraphEnd;
|
|
} else if (sentenceEnd > start + maxSize / 2) {
|
|
chunkEnd = sentenceEnd + 1;
|
|
}
|
|
}
|
|
|
|
chunks.push(content.substring(start, chunkEnd));
|
|
start = chunkEnd - overlap;
|
|
}
|
|
|
|
return chunks;
|
|
}
|
|
|
|
/**
|
|
* Extract tags from document (frontmatter or first heading)
|
|
*/
|
|
private extractTags(content: string): string[] {
|
|
const tags: string[] = [];
|
|
|
|
// Try to extract from YAML frontmatter
|
|
const frontmatterMatch = content.match(/^---\n([\s\S]*?)\n---/);
|
|
if (frontmatterMatch) {
|
|
const frontmatter = frontmatterMatch[1];
|
|
const tagsMatch = frontmatter.match(/tags:\s*\[([^\]]+)\]/);
|
|
if (tagsMatch) {
|
|
tags.push(...tagsMatch[1].split(',').map((t) => t.trim()));
|
|
}
|
|
}
|
|
|
|
// Extract from first heading
|
|
const headingMatch = content.match(/^#\s+(.+)$/m);
|
|
if (headingMatch) {
|
|
tags.push(headingMatch[1].toLowerCase().replace(/\s+/g, '-'));
|
|
}
|
|
|
|
return tags;
|
|
}
|
|
|
|
/**
|
|
* Hash content for change detection
|
|
*/
|
|
private hashContent(content: string): string {
|
|
return createHash('md5').update(content).digest('hex');
|
|
}
|
|
|
|
/**
|
|
* Recursively find all markdown files
|
|
*/
|
|
private async findMarkdownFiles(dir: string): Promise<string[]> {
|
|
const files: string[] = [];
|
|
|
|
try {
|
|
const entries = await readdir(dir, { withFileTypes: true });
|
|
|
|
for (const entry of entries) {
|
|
const fullPath = join(dir, entry.name);
|
|
|
|
if (entry.isDirectory()) {
|
|
const subFiles = await this.findMarkdownFiles(fullPath);
|
|
files.push(...subFiles);
|
|
} else if (entry.isFile() && entry.name.endsWith('.md')) {
|
|
files.push(fullPath);
|
|
}
|
|
}
|
|
} catch (error) {
|
|
this.logger.warn({ error, dir }, 'Failed to read directory');
|
|
}
|
|
|
|
return files;
|
|
}
|
|
|
|
/**
|
|
* Get loaded document stats
|
|
*/
|
|
getStats(): { totalDocs: number; totalSize: number } {
|
|
return {
|
|
totalDocs: this.loadedDocs.size,
|
|
totalSize: Array.from(this.loadedDocs.values()).reduce((sum, hash) => sum + hash.length, 0),
|
|
};
|
|
}
|
|
}
|