Files
ai/iceberg/gateway_schemas.md

293 lines
9.0 KiB
Markdown

# Gateway Iceberg Schemas
Gateway persistence layer tables for conversation history and LangGraph checkpoints.
## Tables
### gateway.conversations
Stores all conversation messages from the agent harness across all channels (WebSocket, Telegram, etc.).
**Schema**:
| Column | Type | Description |
|--------|------|-------------|
| id | string | Unique message ID: `{user_id}:{session_id}:{timestamp_ms}` |
| user_id | string | User identifier (for GDPR compliance) |
| session_id | string | Conversation session identifier |
| role | string | Message role: 'user', 'assistant', 'system', 'tool' |
| content | string | Message content (text, JSON for tool calls/results) |
| metadata | string | JSON metadata (channel, model, tokens, attachments, etc.) |
| timestamp | long | Message timestamp in microseconds (UTC) |
**Natural Key**: `(id)` - unique per message, includes timestamp for ordering
**Partitioning**: `(user_id, days(timestamp))`
- Partition by user_id for efficient GDPR deletion
- Partition by days for time-range queries
- Hidden partitioning - not exposed in queries
**Iceberg Version**: Format v1 (1.10.1)
- Append-only writes (no updates/deletes via Flink)
- Copy-on-write mode for query performance
- Schema evolution supported
**Storage Format**: Parquet with Snappy compression
**Write Pattern**:
- Gateway → Kafka topic `gateway_conversations` → Flink → Iceberg
- Flink job buffers and writes micro-batches
- Near real-time persistence (5-10 second latency)
**Query Patterns**:
```sql
-- Get recent conversation history for a session
SELECT role, content, timestamp
FROM gateway.conversations
WHERE user_id = 'user123'
AND session_id = 'session456'
AND timestamp > (UNIX_MICROS(CURRENT_TIMESTAMP()) - 86400000000) -- Last 24h
ORDER BY timestamp DESC
LIMIT 50;
-- Search user's conversations across all sessions
SELECT session_id, role, content, timestamp
FROM gateway.conversations
WHERE user_id = 'user123'
AND timestamp BETWEEN 1735689600000000 AND 1736294399000000
ORDER BY timestamp DESC;
-- Count messages by channel
SELECT
JSON_EXTRACT_SCALAR(metadata, '$.channel') as channel,
COUNT(*) as message_count
FROM gateway.conversations
WHERE user_id = 'user123'
GROUP BY JSON_EXTRACT_SCALAR(metadata, '$.channel');
```
**GDPR Compliance**:
```sql
-- Delete all messages for a user (via catalog API, not Flink)
DELETE FROM gateway.conversations WHERE user_id = 'user123';
```
---
### gateway.checkpoints
Stores LangGraph checkpoints for agent workflow state persistence.
**Schema**:
| Column | Type | Description |
|--------|------|-------------|
| id | string | Checkpoint ID: `{user_id}:{session_id}:{checkpoint_ns}:{timestamp_ms}` |
| user_id | string | User identifier (for GDPR compliance) |
| session_id | string | Conversation session identifier |
| checkpoint_ns | string | LangGraph checkpoint namespace (default: 'default') |
| thread_id | string | LangGraph thread identifier (often same as session_id) |
| parent_checkpoint_id | string | Parent checkpoint ID for replay/branching |
| checkpoint_data | string | Serialized checkpoint state (JSON) |
| metadata | string | JSON metadata (step_count, node_name, status, etc.) |
| timestamp | long | Checkpoint timestamp in microseconds (UTC) |
**Natural Key**: `(id)` - unique per checkpoint
**Partitioning**: `(user_id, days(timestamp))`
- Partition by user_id for GDPR compliance
- Partition by days for efficient pruning of old checkpoints
- Hidden partitioning
**Iceberg Version**: Format v1 (1.10.1)
- Append-only writes
- Checkpoints are immutable once written
- Copy-on-write mode
**Storage Format**: Parquet with Snappy compression
**Write Pattern**:
- Gateway → Kafka topic `gateway_checkpoints` → Flink → Iceberg
- Checkpoints written on each LangGraph step
- Critical for workflow resumption after failures
**Query Patterns**:
```sql
-- Get latest checkpoint for a session
SELECT checkpoint_data, metadata, timestamp
FROM gateway.checkpoints
WHERE user_id = 'user123'
AND session_id = 'session456'
AND checkpoint_ns = 'default'
ORDER BY timestamp DESC
LIMIT 1;
-- Get checkpoint history for debugging
SELECT id, parent_checkpoint_id,
JSON_EXTRACT_SCALAR(metadata, '$.node_name') as node,
JSON_EXTRACT_SCALAR(metadata, '$.step_count') as step,
timestamp
FROM gateway.checkpoints
WHERE user_id = 'user123'
AND session_id = 'session456'
ORDER BY timestamp;
-- Find checkpoints for a specific workflow node
SELECT *
FROM gateway.checkpoints
WHERE user_id = 'user123'
AND JSON_EXTRACT_SCALAR(metadata, '$.node_name') = 'human_approval'
AND JSON_EXTRACT_SCALAR(metadata, '$.status') = 'pending'
ORDER BY timestamp DESC;
```
**GDPR Compliance**:
```sql
-- Delete all checkpoints for a user
DELETE FROM gateway.checkpoints WHERE user_id = 'user123';
```
---
## Kafka Topics
### gateway_conversations
- **Partitions**: 6 (partitioned by user_id hash)
- **Replication Factor**: 3
- **Retention**: 7 days (Kafka), unlimited (Iceberg)
- **Schema**: Protobuf (see `protobuf/gateway_messages.proto`)
### gateway_checkpoints
- **Partitions**: 6 (partitioned by user_id hash)
- **Replication Factor**: 3
- **Retention**: 7 days (Kafka), unlimited (Iceberg)
- **Schema**: Protobuf (see `protobuf/gateway_checkpoints.proto`)
---
## Flink Integration
### SchemaInitializer.java
Add to `flink/src/main/java/com/dexorder/flink/iceberg/SchemaInitializer.java`:
```java
// Initialize gateway.conversations table
TableIdentifier conversationsTable = TableIdentifier.of("gateway", "conversations");
Schema conversationsSchema = new Schema(
required(1, "id", Types.StringType.get()),
required(2, "user_id", Types.StringType.get()),
required(3, "session_id", Types.StringType.get()),
required(4, "role", Types.StringType.get()),
required(5, "content", Types.StringType.get()),
optional(6, "metadata", Types.StringType.get()),
required(7, "timestamp", Types.LongType.get())
);
PartitionSpec conversationsPartitionSpec = PartitionSpec.builderFor(conversationsSchema)
.identity("user_id")
.day("timestamp", "timestamp_day")
.build();
catalog.createTable(conversationsTable, conversationsSchema, conversationsPartitionSpec);
// Initialize gateway.checkpoints table
TableIdentifier checkpointsTable = TableIdentifier.of("gateway", "checkpoints");
Schema checkpointsSchema = new Schema(
required(1, "id", Types.StringType.get()),
required(2, "user_id", Types.StringType.get()),
required(3, "session_id", Types.StringType.get()),
required(4, "checkpoint_ns", Types.StringType.get()),
required(5, "thread_id", Types.StringType.get()),
optional(6, "parent_checkpoint_id", Types.StringType.get()),
required(7, "checkpoint_data", Types.StringType.get()),
optional(8, "metadata", Types.StringType.get()),
required(9, "timestamp", Types.LongType.get())
);
PartitionSpec checkpointsPartitionSpec = PartitionSpec.builderFor(checkpointsSchema)
.identity("user_id")
.day("timestamp", "timestamp_day")
.build();
catalog.createTable(checkpointsTable, checkpointsSchema, checkpointsPartitionSpec);
```
### Flink Jobs
**ConversationsSink.java** - Read from `gateway_conversations` Kafka topic and write to Iceberg:
```java
TableLoader tableLoader = TableLoader.fromCatalog(
CatalogLoader.rest("gateway", catalogUri),
TableIdentifier.of("gateway", "conversations")
);
DataStream<Row> messageRows = // ... from Kafka gateway_conversations
FlinkSink.forRow(messageRows, conversationsSchema)
.tableLoader(tableLoader)
.append() // Append-only
.build();
```
**CheckpointsSink.java** - Read from `gateway_checkpoints` Kafka topic and write to Iceberg:
```java
TableLoader tableLoader = TableLoader.fromCatalog(
CatalogLoader.rest("gateway", catalogUri),
TableIdentifier.of("gateway", "checkpoints")
);
DataStream<Row> checkpointRows = // ... from Kafka gateway_checkpoints
FlinkSink.forRow(checkpointRows, checkpointsSchema)
.tableLoader(tableLoader)
.append() // Append-only
.build();
```
---
## Access Patterns
### Gateway (Write)
- Buffers messages/checkpoints in Redis (hot layer, 1-hour TTL)
- Async sends to Kafka topics (fire-and-forget)
- Flink streams Kafka → Iceberg (durable storage)
### Gateway (Read)
- Recent data: Read from Redis (fast)
- Historical data: Query Iceberg via REST catalog (slower, cold storage)
- Use iceberg-js for JavaScript/Node.js queries
### Analytics/Debugging
- Query Iceberg tables directly via REST API
- Use Spark or Trino for complex analytics
- GDPR deletions via catalog API
---
## Catalog Configuration
Same as trading namespace (see main README):
```yaml
catalog:
type: rest
uri: http://iceberg-catalog:8181
warehouse: s3://gateway-warehouse/
s3:
endpoint: http://minio:9000
access-key-id: ${S3_ACCESS_KEY}
secret-access-key: ${S3_SECRET_KEY}
```
---
## References
- [LangGraph Checkpointer](https://langchain-ai.github.io/langgraph/concepts/persistence/)
- [Apache Iceberg Partitioning](https://iceberg.apache.org/docs/latest/partitioning/)
- [Flink Iceberg Connector](https://iceberg.apache.org/docs/latest/flink/)