/** * DuckDB Client for querying Apache Iceberg tables * * Uses DuckDB's native Iceberg and Parquet support to query data * directly from S3/MinIO without needing catalog-only libraries. */ import duckdb from 'duckdb'; import type { FastifyBaseLogger } from 'fastify'; import { promisify } from 'util'; type Database = duckdb.Database; type Connection = duckdb.Connection; const { Database, Connection } = duckdb; export interface DuckDBConfig { catalogUri: string; namespace: string; ohlcCatalogUri?: string; ohlcNamespace?: string; s3Endpoint?: string; s3AccessKey?: string; s3SecretKey?: string; conversationsBucket?: string; // S3 bucket for conversation cold storage } /** * DuckDB Client with Iceberg support * * Provides SQL-based queries against Iceberg tables stored in S3/MinIO. */ export class DuckDBClient { private db: Database | null = null; private conn: Connection | null = null; private namespace: string; private ohlcNamespace: string; private catalogUri: string; private ohlcCatalogUri: string; private s3Config: { endpoint?: string; accessKey?: string; secretKey?: string; }; private conversationsBucket?: string; private logger: FastifyBaseLogger; private initialized = false; constructor(config: DuckDBConfig, logger: FastifyBaseLogger) { this.logger = logger; this.namespace = config.namespace; this.catalogUri = config.catalogUri; this.ohlcCatalogUri = config.ohlcCatalogUri || config.catalogUri; this.ohlcNamespace = config.ohlcNamespace || 'trading'; this.conversationsBucket = config.conversationsBucket; this.s3Config = { endpoint: config.s3Endpoint, accessKey: config.s3AccessKey, secretKey: config.s3SecretKey, }; } /** * Initialize DuckDB connection and configure S3/Iceberg extensions */ async initialize(): Promise { if (this.initialized) { return; } try { this.db = new Database(':memory:'); this.conn = this.db.connect(); const all = promisify(this.conn.all.bind(this.conn)); // Install and load required extensions await all('INSTALL httpfs;'); await all('LOAD httpfs;'); await all('INSTALL iceberg;'); await all('LOAD iceberg;'); // Configure S3 credentials if provided if (this.s3Config.endpoint && this.s3Config.accessKey && this.s3Config.secretKey) { const s3Url = new URL(this.s3Config.endpoint); const useSSL = s3Url.protocol === 'https:'; await all(`SET s3_endpoint='${s3Url.hostname}:${s3Url.port || (useSSL ? 443 : 9000)}';`); await all(`SET s3_access_key_id='${this.s3Config.accessKey}';`); await all(`SET s3_secret_access_key='${this.s3Config.secretKey}';`); await all(`SET s3_use_ssl=${useSSL};`); await all(`SET s3_url_style='path';`); await all(`SET s3_region='us-east-1';`); await all(`SET s3_url_compatibility_mode=true;`); this.logger.info({ endpoint: this.s3Config.endpoint, useSSL, }, 'Configured DuckDB S3 settings'); } this.initialized = true; this.logger.info({ catalogUri: this.catalogUri, namespace: this.namespace, ohlcCatalogUri: this.ohlcCatalogUri, ohlcNamespace: this.ohlcNamespace, }, 'DuckDB client initialized'); } catch (error) { this.logger.error({ error }, 'Failed to initialize DuckDB'); throw error; } } /** * Execute a SQL query and return all rows */ private async query(sql: string, params?: any[]): Promise { if (!this.conn) { throw new Error('DuckDB connection not initialized'); } try { const all = promisify(this.conn.all.bind(this.conn)) as (sql: string, ...params: any[]) => Promise; const rows = params && params.length > 0 ? await all(sql, ...params) : await all(sql); return rows as T[]; } catch (error) { this.logger.error({ error, sql, params }, 'DuckDB query failed'); throw error; } } /** * Get the Iceberg table path from REST catalog */ private async getTablePath(namespace: string, tableName: string, catalogUri: string): Promise { try { const tableUrl = `${catalogUri}/v1/namespaces/${namespace}/tables/${tableName}`; this.logger.debug({ tableUrl }, 'Fetching Iceberg table metadata'); const response = await fetch(tableUrl, { method: 'GET', headers: { 'Content-Type': 'application/json', }, }); if (!response.ok) { if (response.status === 404) { this.logger.debug({ namespace, tableName }, 'Table not found in catalog'); return null; } throw new Error(`Failed to fetch table metadata: ${response.status} ${response.statusText}`); } const metadata = await response.json() as any; // Extract metadata location (S3 path to metadata.json) const metadataLocation = metadata['metadata-location'] || metadata.location; if (!metadataLocation) { this.logger.warn({ metadata }, 'No metadata-location found in table response'); return null; } this.logger.debug({ metadataLocation }, 'Found Iceberg table location'); return metadataLocation; } catch (error: any) { this.logger.error({ error: error.message, namespace, tableName }, 'Failed to get table path'); return null; } } /** * Query messages from gateway.conversations table */ async queryMessages( userId: string, sessionId: string, options?: { startTime?: number; endTime?: number; limit?: number; } ): Promise { await this.initialize(); try { const tablePath = await this.getTablePath( this.namespace, 'conversations', this.catalogUri ); if (!tablePath) { // Fallback: scan Parquet files written directly to conversations bucket if (this.conversationsBucket) { this.logger.debug({ userId, sessionId }, 'REST catalog miss, scanning Parquet cold storage'); const parquetPath = `s3://${this.conversationsBucket}/gateway/conversations/**/user_id=${userId}/${sessionId}.parquet`; const fallbackSql = ` SELECT id, user_id, session_id, role, content, metadata, timestamp FROM read_parquet('${parquetPath}') ORDER BY timestamp ASC ${options?.limit ? `LIMIT ${options.limit}` : ''} `; try { return await this.query(fallbackSql); } catch { // File may not exist yet } } this.logger.warn('Conversations table not found and no cold storage configured'); return []; } // Build SQL query with optional filters let sql = ` SELECT id, user_id, session_id, role, content, metadata, timestamp FROM iceberg_scan('${tablePath}') WHERE user_id = ? AND session_id = ? `; const params: any[] = [userId, sessionId]; if (options?.startTime) { sql += ' AND timestamp >= ?'; params.push(options.startTime.toString()); } if (options?.endTime) { sql += ' AND timestamp <= ?'; params.push(options.endTime.toString()); } sql += ' ORDER BY timestamp ASC'; if (options?.limit) { sql += ' LIMIT ?'; params.push(options.limit); } this.logger.debug({ userId, sessionId, options }, 'Querying conversation messages'); const rows = await this.query(sql, params); this.logger.info({ userId, sessionId, count: rows.length }, 'Loaded conversation messages from Iceberg'); // Convert timestamp strings back to numbers return rows.map((row: any) => ({ ...row, timestamp: Number(row.timestamp) })); } catch (error: any) { this.logger.error({ error: error.message, userId, sessionId }, 'Failed to query conversation messages'); return []; } } /** * Query checkpoint from gateway.checkpoints table */ async queryCheckpoint( userId: string, sessionId: string, checkpointId?: string ): Promise { await this.initialize(); try { const tablePath = await this.getTablePath( this.namespace, 'checkpoints', this.catalogUri ); if (!tablePath) { this.logger.warn('Checkpoints table not found'); return null; } let sql = ` SELECT user_id, session_id, checkpoint_id, checkpoint_data, metadata, timestamp FROM iceberg_scan('${tablePath}') WHERE user_id = ? AND session_id = ? `; const params: any[] = [userId, sessionId]; if (checkpointId) { sql += ' AND checkpoint_id = ?'; params.push(checkpointId); } sql += ' ORDER BY timestamp DESC LIMIT 1'; this.logger.debug({ userId, sessionId, checkpointId }, 'Querying checkpoint'); const rows = await this.query(sql, params); if (rows.length === 0) { return null; } const row = rows[0]; this.logger.info({ userId, sessionId, checkpointId: row.checkpoint_id }, 'Loaded checkpoint from Iceberg'); // Convert timestamp string back to number return { ...row, timestamp: Number(row.timestamp) }; } catch (error: any) { this.logger.error({ error: error.message, userId, sessionId, checkpointId }, 'Failed to query checkpoint'); return null; } } /** * Query symbol metadata from trading.symbol_metadata table */ async queryAllSymbols(): Promise { await this.initialize(); try { const tablePath = await this.getTablePath( this.ohlcNamespace, 'symbol_metadata', this.ohlcCatalogUri ); if (!tablePath) { this.logger.warn('Symbol metadata table not found'); return []; } // Query the Iceberg table using DuckDB const sql = `SELECT * FROM iceberg_scan('${tablePath}')`; this.logger.debug({ sql }, 'Querying symbol metadata'); const rows = await this.query(sql); this.logger.info({ count: rows.length }, 'Loaded symbol metadata from Iceberg'); return rows; } catch (error: any) { this.logger.error({ error: error.message }, 'Failed to query symbol metadata'); return []; } } /** * Query OHLC data from trading.ohlc table */ async queryOHLC( ticker: string, period_seconds: number, start_time: bigint, // nanoseconds end_time: bigint // nanoseconds ): Promise { await this.initialize(); try { const tablePath = await this.getTablePath( this.ohlcNamespace, 'ohlc', this.ohlcCatalogUri ); if (!tablePath) { this.logger.warn('OHLC table not found'); return []; } // Query the Iceberg table with filters const sql = ` SELECT timestamp, ticker, period_seconds, open, high, low, close, volume FROM iceberg_scan('${tablePath}') WHERE ticker = ? AND period_seconds = ? AND timestamp >= ? AND timestamp < ? ORDER BY timestamp ASC `; const params = [ ticker, period_seconds, start_time.toString(), end_time.toString() ]; this.logger.debug({ ticker, period_seconds, start_time, end_time }, 'Querying OHLC data'); const rows = await this.query(sql, params); this.logger.info({ ticker, period_seconds, count: rows.length }, 'Loaded OHLC data from Iceberg'); // Keep timestamp as bigint to preserve full microsecond precision. // Convert to seconds (divide first) only when producing TradingView bars. return rows.map((row: any) => ({ ...row, timestamp: BigInt(row.timestamp) })); } catch (error: any) { this.logger.error({ error: error.message, ticker, period_seconds }, 'Failed to query OHLC data'); return []; } } /** * Check if OHLC data exists for the given parameters */ async hasOHLCData( ticker: string, period_seconds: number, start_time: bigint, end_time: bigint ): Promise { await this.initialize(); try { const tablePath = await this.getTablePath( this.ohlcNamespace, 'ohlc', this.ohlcCatalogUri ); if (!tablePath) { return false; } const sql = ` SELECT COUNT(*) as count FROM iceberg_scan('${tablePath}') WHERE ticker = ? AND period_seconds = ? AND timestamp >= ? AND timestamp < ? `; const params = [ ticker, period_seconds, start_time.toString(), end_time.toString() ]; const rows = await this.query<{ count: number }>(sql, params); return rows.length > 0 && rows[0].count > 0; } catch (error: any) { this.logger.error({ error: error.message }, 'Failed to check OHLC data existence'); return false; } } /** * Find missing OHLC data ranges */ async findMissingOHLCRanges( ticker: string, period_seconds: number, start_time: bigint, end_time: bigint ): Promise> { await this.initialize(); try { const data = await this.queryOHLC(ticker, period_seconds, start_time, end_time); if (data.length === 0) { // All data is missing return [[start_time, end_time]]; } // Check if we have continuous data // For now, simple check: if we have any data, assume complete // TODO: Implement proper gap detection by checking for missing periods const periodMicros = BigInt(period_seconds) * 1000000n; // end_time is exclusive, so expected count = (end - start) / period (no +1) const expectedBars = Number((end_time - start_time) / periodMicros); if (data.length < expectedBars * 0.95) { // Allow 5% tolerance this.logger.debug({ ticker, expected: expectedBars, actual: data.length, }, 'Incomplete OHLC data detected'); return [[start_time, end_time]]; // Request full range } // Data appears complete return []; } catch (error: any) { this.logger.error({ error: error.message }, 'Failed to find missing OHLC ranges'); // Return full range on error (safe default) return [[start_time, end_time]]; } } /** * Append a batch of conversation messages as a Parquet file in S3. * Called once per session at session end to avoid small-file fragmentation. */ async appendMessages( userId: string, sessionId: string, messages: Array<{ id: string; user_id: string; session_id: string; role: string; content: string; metadata: string; timestamp: number; }> ): Promise { await this.initialize(); if (!this.conversationsBucket || messages.length === 0) { return; } const now = new Date(); const year = now.getUTCFullYear(); const month = String(now.getUTCMonth() + 1).padStart(2, '0'); const s3Path = `s3://${this.conversationsBucket}/gateway/conversations/year=${year}/month=${month}/user_id=${userId}/${sessionId}.parquet`; // Use a timestamp-based name to avoid cross-session collisions const tempTable = `msg_flush_${Date.now()}`; try { await this.query(` CREATE TEMP TABLE ${tempTable} ( id VARCHAR, user_id VARCHAR, session_id VARCHAR, role VARCHAR, content VARCHAR, metadata VARCHAR, timestamp BIGINT ) `); for (const msg of messages) { await this.query( `INSERT INTO ${tempTable} VALUES (?, ?, ?, ?, ?, ?, ?)`, [msg.id, msg.user_id, msg.session_id, msg.role, msg.content, msg.metadata, msg.timestamp] ); } await this.query(`COPY ${tempTable} TO '${s3Path}' (FORMAT PARQUET)`); this.logger.info({ userId, sessionId, count: messages.length, s3Path }, 'Conversation flushed to Parquet'); } finally { await this.query(`DROP TABLE IF EXISTS ${tempTable}`).catch(() => {}); } } /** * Close the DuckDB connection */ async close(): Promise { if (this.conn) { const close = promisify(this.conn.close.bind(this.conn)); await close(); this.conn = null; } if (this.db) { const close = promisify(this.db.close.bind(this.db)); await close(); this.db = null; } this.initialized = false; this.logger.info('DuckDB client closed'); } }