sandbox connected and streaming
This commit is contained in:
@@ -21,6 +21,7 @@ export interface DuckDBConfig {
|
||||
s3Endpoint?: string;
|
||||
s3AccessKey?: string;
|
||||
s3SecretKey?: string;
|
||||
conversationsBucket?: string; // S3 bucket for conversation cold storage
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -40,6 +41,7 @@ export class DuckDBClient {
|
||||
accessKey?: string;
|
||||
secretKey?: string;
|
||||
};
|
||||
private conversationsBucket?: string;
|
||||
private logger: FastifyBaseLogger;
|
||||
private initialized = false;
|
||||
|
||||
@@ -49,6 +51,7 @@ export class DuckDBClient {
|
||||
this.catalogUri = config.catalogUri;
|
||||
this.ohlcCatalogUri = config.ohlcCatalogUri || config.catalogUri;
|
||||
this.ohlcNamespace = config.ohlcNamespace || 'trading';
|
||||
this.conversationsBucket = config.conversationsBucket;
|
||||
this.s3Config = {
|
||||
endpoint: config.s3Endpoint,
|
||||
accessKey: config.s3AccessKey,
|
||||
@@ -190,7 +193,23 @@ export class DuckDBClient {
|
||||
);
|
||||
|
||||
if (!tablePath) {
|
||||
this.logger.warn('Conversations table not found');
|
||||
// Fallback: scan Parquet files written directly to conversations bucket
|
||||
if (this.conversationsBucket) {
|
||||
this.logger.debug({ userId, sessionId }, 'REST catalog miss, scanning Parquet cold storage');
|
||||
const parquetPath = `s3://${this.conversationsBucket}/gateway/conversations/**/user_id=${userId}/${sessionId}.parquet`;
|
||||
const fallbackSql = `
|
||||
SELECT id, user_id, session_id, role, content, metadata, timestamp
|
||||
FROM read_parquet('${parquetPath}')
|
||||
ORDER BY timestamp ASC
|
||||
${options?.limit ? `LIMIT ${options.limit}` : ''}
|
||||
`;
|
||||
try {
|
||||
return await this.query(fallbackSql);
|
||||
} catch {
|
||||
// File may not exist yet
|
||||
}
|
||||
}
|
||||
this.logger.warn('Conversations table not found and no cold storage configured');
|
||||
return [];
|
||||
}
|
||||
|
||||
@@ -526,6 +545,65 @@ export class DuckDBClient {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Append a batch of conversation messages as a Parquet file in S3.
|
||||
* Called once per session at session end to avoid small-file fragmentation.
|
||||
*/
|
||||
async appendMessages(
|
||||
userId: string,
|
||||
sessionId: string,
|
||||
messages: Array<{
|
||||
id: string;
|
||||
user_id: string;
|
||||
session_id: string;
|
||||
role: string;
|
||||
content: string;
|
||||
metadata: string;
|
||||
timestamp: number;
|
||||
}>
|
||||
): Promise<void> {
|
||||
await this.initialize();
|
||||
|
||||
if (!this.conversationsBucket || messages.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const now = new Date();
|
||||
const year = now.getUTCFullYear();
|
||||
const month = String(now.getUTCMonth() + 1).padStart(2, '0');
|
||||
const s3Path = `s3://${this.conversationsBucket}/gateway/conversations/year=${year}/month=${month}/user_id=${userId}/${sessionId}.parquet`;
|
||||
|
||||
// Use a timestamp-based name to avoid cross-session collisions
|
||||
const tempTable = `msg_flush_${Date.now()}`;
|
||||
|
||||
try {
|
||||
await this.query(`
|
||||
CREATE TEMP TABLE ${tempTable} (
|
||||
id VARCHAR,
|
||||
user_id VARCHAR,
|
||||
session_id VARCHAR,
|
||||
role VARCHAR,
|
||||
content VARCHAR,
|
||||
metadata VARCHAR,
|
||||
timestamp BIGINT
|
||||
)
|
||||
`);
|
||||
|
||||
for (const msg of messages) {
|
||||
await this.query(
|
||||
`INSERT INTO ${tempTable} VALUES (?, ?, ?, ?, ?, ?, ?)`,
|
||||
[msg.id, msg.user_id, msg.session_id, msg.role, msg.content, msg.metadata, msg.timestamp]
|
||||
);
|
||||
}
|
||||
|
||||
await this.query(`COPY ${tempTable} TO '${s3Path}' (FORMAT PARQUET)`);
|
||||
|
||||
this.logger.info({ userId, sessionId, count: messages.length, s3Path }, 'Conversation flushed to Parquet');
|
||||
} finally {
|
||||
await this.query(`DROP TABLE IF EXISTS ${tempTable}`).catch(() => {});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the DuckDB connection
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user