3.6 KiB
3.6 KiB
Iceberg Schema Definitions
We use Apache Iceberg for historical data storage. The metadata server is a PostgreSQL database.
This directory stores schema files and database setup.
Tables
trading.ohlc
Historical OHLC (Open, High, Low, Close, Volume) candle data for all periods in a single table.
Schema: ohlc_schema.sql
Natural Key: (ticker, period_seconds, timestamp) - uniqueness enforced by application
Partitioning: (ticker, days(timestamp))
- Partition by ticker to isolate different markets
- Partition by days for efficient time-range queries
- Hidden partitioning - not exposed in queries
Iceberg Version: Format v1 (1.10.1)
- Uses equality delete files for deduplication
- Flink upsert mode generates equality deletes
- Last-write-wins semantics for duplicates
- Copy-on-write mode for better query performance
Deduplication:
- Flink Iceberg sink with upsert mode
- Equality delete files on (ticker, period_seconds, timestamp)
- PyIceberg automatically filters deleted rows during queries
Storage Format: Parquet with Snappy compression
Supported Periods: Any period in seconds (60, 300, 900, 3600, 14400, 86400, 604800, etc.)
Usage:
-- Query 1-hour candles for specific ticker and time range
SELECT * FROM trading.ohlc
WHERE ticker = 'BINANCE:BTC/USDT'
AND period_seconds = 3600
AND timestamp BETWEEN 1735689600000000 AND 1736294399000000
ORDER BY timestamp;
-- Query most recent 1-minute candles
SELECT * FROM trading.ohlc
WHERE ticker = 'BINANCE:BTC/USDT'
AND period_seconds = 60
AND timestamp > (UNIX_MICROS(CURRENT_TIMESTAMP()) - 3600000000)
ORDER BY timestamp DESC
LIMIT 60;
-- Query all periods for a ticker
SELECT period_seconds, COUNT(*) as candle_count
FROM trading.ohlc
WHERE ticker = 'BINANCE:BTC/USDT'
GROUP BY period_seconds;
Access Patterns
Flink (Write)
- Reads OHLCBatch from Kafka
- Writes rows to Iceberg table
- Uses Iceberg Flink connector
- Upsert mode to handle duplicates
Client-Py (Read)
- Queries historical data after receiving HistoryReadyNotification
- Uses PyIceberg or Iceberg REST API
- Read-only access via Iceberg catalog
Web UI (Read)
- Queries for chart display
- Time-series queries with partition pruning
- Read-only access
Catalog Configuration
The Iceberg catalog is accessed via REST API:
catalog:
type: rest
uri: http://iceberg-catalog:8181
warehouse: s3://trading-warehouse/
s3:
endpoint: http://minio:9000
access-key-id: ${S3_ACCESS_KEY}
secret-access-key: ${S3_SECRET_KEY}
Table Naming Convention
{namespace}.ohlc where:
namespace: Trading namespace (default: "trading")- All OHLC data is stored in a single table
- Partitioned by ticker and date for efficient queries
Integration Examples
Flink Write
TableLoader tableLoader = TableLoader.fromCatalog(
CatalogLoader.rest("trading", catalogUri),
TableIdentifier.of("trading", "ohlc")
);
DataStream<Row> ohlcRows = // ... from OHLCBatch
FlinkSink.forRow(ohlcRows, schema)
.tableLoader(tableLoader)
.upsert(true)
.build();
Python Read
from pyiceberg.catalog import load_catalog
catalog = load_catalog("trading", uri="http://iceberg-catalog:8181")
table = catalog.load_table("trading.ohlc")
# Query with filters
df = table.scan(
row_filter=(
(col("ticker") == "BINANCE:BTC/USDT") &
(col("period_seconds") == 3600) &
(col("timestamp") >= 1735689600000000)
)
).to_pandas()