Files
ai/iceberg/gateway_schemas.md

9.0 KiB

Gateway Iceberg Schemas

Gateway persistence layer tables for conversation history and LangGraph checkpoints.

Tables

gateway.conversations

Stores all conversation messages from the agent harness across all channels (WebSocket, Telegram, etc.).

Schema:

Column Type Description
id string Unique message ID: {user_id}:{session_id}:{timestamp_ms}
user_id string User identifier (for GDPR compliance)
session_id string Conversation session identifier
role string Message role: 'user', 'assistant', 'system', 'tool'
content string Message content (text, JSON for tool calls/results)
metadata string JSON metadata (channel, model, tokens, attachments, etc.)
timestamp long Message timestamp in microseconds (UTC)

Natural Key: (id) - unique per message, includes timestamp for ordering

Partitioning: (user_id, days(timestamp))

  • Partition by user_id for efficient GDPR deletion
  • Partition by days for time-range queries
  • Hidden partitioning - not exposed in queries

Iceberg Version: Format v1 (1.10.1)

  • Append-only writes (no updates/deletes via Flink)
  • Copy-on-write mode for query performance
  • Schema evolution supported

Storage Format: Parquet with Snappy compression

Write Pattern:

  • Gateway → Kafka topic gateway_conversations → Flink → Iceberg
  • Flink job buffers and writes micro-batches
  • Near real-time persistence (5-10 second latency)

Query Patterns:

-- Get recent conversation history for a session
SELECT role, content, timestamp
FROM gateway.conversations
WHERE user_id = 'user123'
  AND session_id = 'session456'
  AND timestamp > (UNIX_MICROS(CURRENT_TIMESTAMP()) - 86400000000)  -- Last 24h
ORDER BY timestamp DESC
LIMIT 50;

-- Search user's conversations across all sessions
SELECT session_id, role, content, timestamp
FROM gateway.conversations
WHERE user_id = 'user123'
  AND timestamp BETWEEN 1735689600000000 AND 1736294399000000
ORDER BY timestamp DESC;

-- Count messages by channel
SELECT
  JSON_EXTRACT_SCALAR(metadata, '$.channel') as channel,
  COUNT(*) as message_count
FROM gateway.conversations
WHERE user_id = 'user123'
GROUP BY JSON_EXTRACT_SCALAR(metadata, '$.channel');

GDPR Compliance:

-- Delete all messages for a user (via catalog API, not Flink)
DELETE FROM gateway.conversations WHERE user_id = 'user123';

gateway.checkpoints

Stores LangGraph checkpoints for agent workflow state persistence.

Schema:

Column Type Description
id string Checkpoint ID: {user_id}:{session_id}:{checkpoint_ns}:{timestamp_ms}
user_id string User identifier (for GDPR compliance)
session_id string Conversation session identifier
checkpoint_ns string LangGraph checkpoint namespace (default: 'default')
thread_id string LangGraph thread identifier (often same as session_id)
parent_checkpoint_id string Parent checkpoint ID for replay/branching
checkpoint_data string Serialized checkpoint state (JSON)
metadata string JSON metadata (step_count, node_name, status, etc.)
timestamp long Checkpoint timestamp in microseconds (UTC)

Natural Key: (id) - unique per checkpoint

Partitioning: (user_id, days(timestamp))

  • Partition by user_id for GDPR compliance
  • Partition by days for efficient pruning of old checkpoints
  • Hidden partitioning

Iceberg Version: Format v1 (1.10.1)

  • Append-only writes
  • Checkpoints are immutable once written
  • Copy-on-write mode

Storage Format: Parquet with Snappy compression

Write Pattern:

  • Gateway → Kafka topic gateway_checkpoints → Flink → Iceberg
  • Checkpoints written on each LangGraph step
  • Critical for workflow resumption after failures

Query Patterns:

-- Get latest checkpoint for a session
SELECT checkpoint_data, metadata, timestamp
FROM gateway.checkpoints
WHERE user_id = 'user123'
  AND session_id = 'session456'
  AND checkpoint_ns = 'default'
ORDER BY timestamp DESC
LIMIT 1;

-- Get checkpoint history for debugging
SELECT id, parent_checkpoint_id,
       JSON_EXTRACT_SCALAR(metadata, '$.node_name') as node,
       JSON_EXTRACT_SCALAR(metadata, '$.step_count') as step,
       timestamp
FROM gateway.checkpoints
WHERE user_id = 'user123'
  AND session_id = 'session456'
ORDER BY timestamp;

-- Find checkpoints for a specific workflow node
SELECT *
FROM gateway.checkpoints
WHERE user_id = 'user123'
  AND JSON_EXTRACT_SCALAR(metadata, '$.node_name') = 'human_approval'
  AND JSON_EXTRACT_SCALAR(metadata, '$.status') = 'pending'
ORDER BY timestamp DESC;

GDPR Compliance:

-- Delete all checkpoints for a user
DELETE FROM gateway.checkpoints WHERE user_id = 'user123';

Kafka Topics

gateway_conversations

  • Partitions: 6 (partitioned by user_id hash)
  • Replication Factor: 3
  • Retention: 7 days (Kafka), unlimited (Iceberg)
  • Schema: Protobuf (see protobuf/gateway_messages.proto)

gateway_checkpoints

  • Partitions: 6 (partitioned by user_id hash)
  • Replication Factor: 3
  • Retention: 7 days (Kafka), unlimited (Iceberg)
  • Schema: Protobuf (see protobuf/gateway_checkpoints.proto)

SchemaInitializer.java

Add to flink/src/main/java/com/dexorder/flink/iceberg/SchemaInitializer.java:

// Initialize gateway.conversations table
TableIdentifier conversationsTable = TableIdentifier.of("gateway", "conversations");
Schema conversationsSchema = new Schema(
    required(1, "id", Types.StringType.get()),
    required(2, "user_id", Types.StringType.get()),
    required(3, "session_id", Types.StringType.get()),
    required(4, "role", Types.StringType.get()),
    required(5, "content", Types.StringType.get()),
    optional(6, "metadata", Types.StringType.get()),
    required(7, "timestamp", Types.LongType.get())
);

PartitionSpec conversationsPartitionSpec = PartitionSpec.builderFor(conversationsSchema)
    .identity("user_id")
    .day("timestamp", "timestamp_day")
    .build();

catalog.createTable(conversationsTable, conversationsSchema, conversationsPartitionSpec);

// Initialize gateway.checkpoints table
TableIdentifier checkpointsTable = TableIdentifier.of("gateway", "checkpoints");
Schema checkpointsSchema = new Schema(
    required(1, "id", Types.StringType.get()),
    required(2, "user_id", Types.StringType.get()),
    required(3, "session_id", Types.StringType.get()),
    required(4, "checkpoint_ns", Types.StringType.get()),
    required(5, "thread_id", Types.StringType.get()),
    optional(6, "parent_checkpoint_id", Types.StringType.get()),
    required(7, "checkpoint_data", Types.StringType.get()),
    optional(8, "metadata", Types.StringType.get()),
    required(9, "timestamp", Types.LongType.get())
);

PartitionSpec checkpointsPartitionSpec = PartitionSpec.builderFor(checkpointsSchema)
    .identity("user_id")
    .day("timestamp", "timestamp_day")
    .build();

catalog.createTable(checkpointsTable, checkpointsSchema, checkpointsPartitionSpec);

ConversationsSink.java - Read from gateway_conversations Kafka topic and write to Iceberg:

TableLoader tableLoader = TableLoader.fromCatalog(
    CatalogLoader.rest("gateway", catalogUri),
    TableIdentifier.of("gateway", "conversations")
);

DataStream<Row> messageRows = // ... from Kafka gateway_conversations

FlinkSink.forRow(messageRows, conversationsSchema)
    .tableLoader(tableLoader)
    .append()  // Append-only
    .build();

CheckpointsSink.java - Read from gateway_checkpoints Kafka topic and write to Iceberg:

TableLoader tableLoader = TableLoader.fromCatalog(
    CatalogLoader.rest("gateway", catalogUri),
    TableIdentifier.of("gateway", "checkpoints")
);

DataStream<Row> checkpointRows = // ... from Kafka gateway_checkpoints

FlinkSink.forRow(checkpointRows, checkpointsSchema)
    .tableLoader(tableLoader)
    .append()  // Append-only
    .build();

Access Patterns

Gateway (Write)

  • Buffers messages/checkpoints in Redis (hot layer, 1-hour TTL)
  • Async sends to Kafka topics (fire-and-forget)
  • Flink streams Kafka → Iceberg (durable storage)

Gateway (Read)

  • Recent data: Read from Redis (fast)
  • Historical data: Query Iceberg via REST catalog (slower, cold storage)
  • Use iceberg-js for JavaScript/Node.js queries

Analytics/Debugging

  • Query Iceberg tables directly via REST API
  • Use Spark or Trino for complex analytics
  • GDPR deletions via catalog API

Catalog Configuration

Same as trading namespace (see main README):

catalog:
  type: rest
  uri: http://iceberg-catalog:8181
  warehouse: s3://gateway-warehouse/
  s3:
    endpoint: http://minio:9000
    access-key-id: ${S3_ACCESS_KEY}
    secret-access-key: ${S3_SECRET_KEY}

References