chart data loading

This commit is contained in:
2026-03-24 21:37:49 -04:00
parent f6bd22a8ef
commit c76887ab92
65 changed files with 6350 additions and 713 deletions

View File

@@ -9,7 +9,10 @@ import com.dexorder.flink.publisher.HistoryNotificationForwarder;
import com.dexorder.flink.publisher.HistoryNotificationFunction;
import com.dexorder.flink.publisher.OHLCBatchWrapper;
import com.dexorder.flink.publisher.OHLCBatchDeserializer;
import com.dexorder.flink.publisher.MarketWrapper;
import com.dexorder.flink.publisher.MarketDeserializer;
import com.dexorder.flink.sink.HistoricalBatchWriter;
import com.dexorder.flink.sink.SymbolMetadataWriter;
import com.dexorder.flink.zmq.ZmqChannelManager;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -189,6 +192,42 @@ public class TradingFlinkApp {
LOG.info("Historical pipeline configured: HistoricalBatchWriter -> HistoryNotificationFunction");
// Symbol metadata pipeline: Kafka -> Iceberg -> Broadcast
// Set up Kafka source for symbol metadata
KafkaSource<MarketWrapper> symbolSource = KafkaSource.<MarketWrapper>builder()
.setBootstrapServers(config.getKafkaBootstrapServers())
.setTopics("symbol-metadata")
.setGroupId("flink-symbol-metadata-consumer")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new MarketDeserializer())
.build();
DataStream<MarketWrapper> symbolStream = env
.fromSource(symbolSource, WatermarkStrategy.noWatermarks(), "Symbol Metadata Kafka Source");
LOG.info("Symbol metadata Kafka source configured");
// Create table loader for symbol_metadata table
TableLoader symbolTableLoader = TableLoader.fromCatalog(
catalogLoader2,
TableIdentifier.of(config.getIcebergNamespace(), "symbol_metadata")
);
LOG.info("Symbol metadata table loader configured: {}.symbol_metadata", config.getIcebergNamespace());
// Symbol metadata pipeline: write to Iceberg and send notification
// Uses PUSH socket to job manager's PULL endpoint (same pattern as HistoryNotificationPublisher)
// Task managers connect to flink-jobmanager service (not bind address which is *)
String notificationEndpoint = "tcp://flink-jobmanager:" + config.getNotificationPullPort();
DataStream<MarketWrapper> processedSymbolStream = symbolStream
.flatMap(new SymbolMetadataWriter(symbolTableLoader, notificationEndpoint))
.setParallelism(1);
// Discard sink
processedSymbolStream.addSink(new DiscardingSink<>()).setParallelism(1);
LOG.info("Symbol metadata pipeline configured: SymbolMetadataWriter -> Iceberg -> METADATA_UPDATE notification");
// TODO: Set up CEP patterns and triggers
// TODO: Set up realtime tick processing

View File

@@ -45,6 +45,7 @@ public class SchemaInitializer {
// Initialize each table
initializeOhlcTable();
initializeSymbolMetadataTable();
// Add more table initializations here as needed
@@ -86,7 +87,25 @@ public class SchemaInitializer {
TableIdentifier tableId = TableIdentifier.of(namespace, "ohlc");
try {
if (catalog.tableExists(tableId)) {
boolean tableExists = false;
try {
tableExists = catalog.tableExists(tableId);
} catch (org.apache.iceberg.exceptions.ServiceFailureException e) {
// Handle corrupted table metadata (e.g., missing S3 files)
if (e.getMessage().contains("Location does not exist")) {
LOG.warn("Table {} has corrupted metadata, dropping and recreating", tableId);
try {
catalog.dropTable(tableId, false);
} catch (Exception dropEx) {
LOG.warn("Failed to drop corrupted table (may not exist in catalog)", dropEx);
}
tableExists = false;
} else {
throw e;
}
}
if (tableExists) {
Table existing = catalog.loadTable(tableId);
String existingVersion = existing.properties().get(SCHEMA_VERSION_PROP);
if (!OHLC_SCHEMA_VERSION.equals(existingVersion)) {
@@ -152,4 +171,105 @@ public class SchemaInitializer {
throw new RuntimeException("OHLC table initialization failed", e);
}
}
/**
* Initialize the symbol_metadata table if it doesn't exist.
*/
private static final String SYMBOL_METADATA_SCHEMA_VERSION = "1";
private void initializeSymbolMetadataTable() {
TableIdentifier tableId = TableIdentifier.of(namespace, "symbol_metadata");
try {
boolean tableExists = false;
try {
tableExists = catalog.tableExists(tableId);
} catch (org.apache.iceberg.exceptions.ServiceFailureException e) {
// Handle corrupted table metadata (e.g., missing S3 files)
if (e.getMessage().contains("Location does not exist")) {
LOG.warn("Table {} has corrupted metadata, dropping and recreating", tableId);
try {
catalog.dropTable(tableId, false);
} catch (Exception dropEx) {
LOG.warn("Failed to drop corrupted table (may not exist in catalog)", dropEx);
}
tableExists = false;
} else {
throw e;
}
}
if (tableExists) {
Table existing = catalog.loadTable(tableId);
String existingVersion = existing.properties().get(SCHEMA_VERSION_PROP);
if (!SYMBOL_METADATA_SCHEMA_VERSION.equals(existingVersion)) {
LOG.warn("Table {} has schema version '{}', expected '{}' — manual migration required",
tableId, existingVersion, SYMBOL_METADATA_SCHEMA_VERSION);
}
LOG.info("Table {} already exists at schema version {} — skipping creation", tableId, existingVersion);
return;
}
LOG.info("Creating symbol_metadata table: {}", tableId);
// Define the symbol metadata schema
Schema schema = new Schema(
// Primary key fields
required(1, "exchange_id", Types.StringType.get(), "Exchange identifier (e.g., BINANCE)"),
required(2, "market_id", Types.StringType.get(), "Market symbol (e.g., BTC/USDT)"),
// Market information
optional(3, "market_type", Types.StringType.get(), "Market type (spot, futures, swap)"),
optional(4, "description", Types.StringType.get(), "Human-readable description"),
optional(5, "base_asset", Types.StringType.get(), "Base asset (e.g., BTC)"),
optional(6, "quote_asset", Types.StringType.get(), "Quote asset (e.g., USDT)"),
// Precision/denominator information
optional(7, "tick_denom", Types.LongType.get(), "Tick price denominator (10^n for n decimals)"),
optional(8, "base_denom", Types.LongType.get(), "Base asset denominator"),
optional(9, "quote_denom", Types.LongType.get(), "Quote asset denominator"),
// Supported timeframes
optional(10, "supported_period_seconds", Types.ListType.ofRequired(11, Types.IntegerType.get()), "Supported OHLC periods in seconds"),
// Optional timing information
optional(12, "earliest_time", Types.LongType.get(), "Earliest available data timestamp (microseconds)"),
// Metadata
required(13, "updated_at", Types.LongType.get(), "Timestamp when metadata was last updated (microseconds)")
);
// Create the table with partitioning and properties
// Use format version 2 with UPSERT capabilities via equality deletes
Table table = catalog.buildTable(tableId, schema)
.withPartitionSpec(org.apache.iceberg.PartitionSpec.builderFor(schema)
.identity("exchange_id")
.build())
.withProperty("write.format.default", "parquet")
.withProperty("write.parquet.compression-codec", "snappy")
.withProperty("write.metadata.compression-codec", "gzip")
.withProperty("format-version", "2")
.withProperty("write.upsert.enabled", "true")
.withProperty(SCHEMA_VERSION_PROP, SYMBOL_METADATA_SCHEMA_VERSION)
.create();
// Add identifier fields for UPSERT operations
// This allows Iceberg to use equality deletes for deduplication
table.updateProperties()
.set("write.upsert.enabled", "true")
.commit();
// Set the identifier fields (primary key) for the table
// Iceberg will use these for equality deletes during UPSERT
table.updateSchema()
.setIdentifierFields("exchange_id", "market_id")
.commit();
LOG.info("Successfully created symbol_metadata table: {}", tableId);
} catch (Exception e) {
LOG.error("Failed to initialize symbol_metadata table: {}", tableId, e);
throw new RuntimeException("symbol_metadata table initialization failed", e);
}
}
}

View File

@@ -0,0 +1,91 @@
package com.dexorder.flink.publisher;
import com.dexorder.proto.Market;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Deserializes Market protobuf messages from Kafka.
* Expects ZMQ protocol envelope: [version byte][type_id byte][protobuf payload]
*/
public class MarketDeserializer implements DeserializationSchema<MarketWrapper> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(MarketDeserializer.class);
private static final byte PROTOCOL_VERSION = 0x01;
private static final byte MARKET_TYPE_ID = 0x05;
@Override
public MarketWrapper deserialize(byte[] message) throws IOException {
if (message == null || message.length < 3) {
LOG.warn("Invalid message: too short (length={})", message == null ? 0 : message.length);
return null;
}
// Parse ZMQ protocol envelope
byte version = message[0];
byte typeId = message[1];
if (version != PROTOCOL_VERSION) {
LOG.warn("Unknown protocol version: 0x{}", Integer.toHexString(version & 0xFF));
return null;
}
if (typeId != MARKET_TYPE_ID) {
LOG.warn("Expected MARKET type (0x05), got: 0x{}", Integer.toHexString(typeId & 0xFF));
return null;
}
// Extract protobuf payload (everything after first 2 bytes)
byte[] payload = new byte[message.length - 2];
System.arraycopy(message, 2, payload, 0, payload.length);
try {
// Deserialize protobuf
Market market = Market.parseFrom(payload);
// Debug log the deserialized market
LOG.info("Deserialized Market: exchange_id='{}', market_id='{}', base='{}', quote='{}'",
market.getExchangeId(), market.getMarketId(), market.getBaseAsset(), market.getQuoteAsset());
// Convert to MarketWrapper
MarketWrapper wrapper = new MarketWrapper();
wrapper.setExchangeId(market.getExchangeId());
wrapper.setMarketId(market.getMarketId());
wrapper.setMarketType(market.getMarketType());
wrapper.setDescription(market.getDescription());
wrapper.setBaseAsset(market.getBaseAsset());
wrapper.setQuoteAsset(market.getQuoteAsset());
wrapper.setTickDenom(market.getTickDenom());
wrapper.setBaseDenom(market.getBaseDenom());
wrapper.setQuoteDenom(market.getQuoteDenom());
// Convert repeated field to List
List<Integer> supportedPeriods = new ArrayList<>(market.getSupportedPeriodSecondsList());
wrapper.setSupportedPeriodSeconds(supportedPeriods);
wrapper.setEarliestTime(market.getEarliestTime());
return wrapper;
} catch (Exception e) {
LOG.error("Failed to deserialize Market protobuf", e);
return null;
}
}
@Override
public boolean isEndOfStream(MarketWrapper nextElement) {
return false;
}
@Override
public TypeInformation<MarketWrapper> getProducedType() {
return TypeInformation.of(MarketWrapper.class);
}
}

View File

@@ -0,0 +1,143 @@
package com.dexorder.flink.publisher;
import java.io.Serializable;
import java.util.List;
/**
* Wrapper for Market protobuf message from Kafka.
* Represents symbol metadata for a trading pair.
*/
public class MarketWrapper implements Serializable {
private static final long serialVersionUID = 1L;
private String exchangeId;
private String marketId;
private String marketType;
private String description;
private String baseAsset;
private String quoteAsset;
private long tickDenom;
private long baseDenom;
private long quoteDenom;
private List<Integer> supportedPeriodSeconds;
private long earliestTime;
public MarketWrapper() {
}
public MarketWrapper(String exchangeId, String marketId, String marketType, String description,
String baseAsset, String quoteAsset, long tickDenom, long baseDenom,
long quoteDenom, List<Integer> supportedPeriodSeconds, long earliestTime) {
this.exchangeId = exchangeId;
this.marketId = marketId;
this.marketType = marketType;
this.description = description;
this.baseAsset = baseAsset;
this.quoteAsset = quoteAsset;
this.tickDenom = tickDenom;
this.baseDenom = baseDenom;
this.quoteDenom = quoteDenom;
this.supportedPeriodSeconds = supportedPeriodSeconds;
this.earliestTime = earliestTime;
}
// Getters and setters
public String getExchangeId() {
return exchangeId;
}
public void setExchangeId(String exchangeId) {
this.exchangeId = exchangeId;
}
public String getMarketId() {
return marketId;
}
public void setMarketId(String marketId) {
this.marketId = marketId;
}
public String getMarketType() {
return marketType;
}
public void setMarketType(String marketType) {
this.marketType = marketType;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getBaseAsset() {
return baseAsset;
}
public void setBaseAsset(String baseAsset) {
this.baseAsset = baseAsset;
}
public String getQuoteAsset() {
return quoteAsset;
}
public void setQuoteAsset(String quoteAsset) {
this.quoteAsset = quoteAsset;
}
public long getTickDenom() {
return tickDenom;
}
public void setTickDenom(long tickDenom) {
this.tickDenom = tickDenom;
}
public long getBaseDenom() {
return baseDenom;
}
public void setBaseDenom(long baseDenom) {
this.baseDenom = baseDenom;
}
public long getQuoteDenom() {
return quoteDenom;
}
public void setQuoteDenom(long quoteDenom) {
this.quoteDenom = quoteDenom;
}
public List<Integer> getSupportedPeriodSeconds() {
return supportedPeriodSeconds;
}
public void setSupportedPeriodSeconds(List<Integer> supportedPeriodSeconds) {
this.supportedPeriodSeconds = supportedPeriodSeconds;
}
public long getEarliestTime() {
return earliestTime;
}
public void setEarliestTime(long earliestTime) {
this.earliestTime = earliestTime;
}
@Override
public String toString() {
return "MarketWrapper{" +
"exchangeId='" + exchangeId + '\'' +
", marketId='" + marketId + '\'' +
", marketType='" + marketType + '\'' +
", baseAsset='" + baseAsset + '\'' +
", quoteAsset='" + quoteAsset + '\'' +
'}';
}
}

View File

@@ -0,0 +1,257 @@
package com.dexorder.flink.sink;
import com.dexorder.flink.publisher.MarketWrapper;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFileFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Writes symbol metadata to Iceberg symbol_metadata table.
*
* Deduplicates symbols in-memory to prevent writing duplicates.
* Batches writes by exchange to minimize file fragmentation.
* After committing to Iceberg, sends a notification via ZMQ PUSH socket to job manager.
*/
public class SymbolMetadataWriter extends RichFlatMapFunction<MarketWrapper, MarketWrapper> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(SymbolMetadataWriter.class);
private static final int BATCH_SIZE = 500; // Commit every 500 symbols per exchange
private static final byte PROTOCOL_VERSION = 0x01;
private static final byte MSG_TYPE_SYMBOL_METADATA_UPDATED = 0x13;
private final TableLoader tableLoader;
private final String notificationEndpoint; // Job manager's PULL socket endpoint
private transient Table table;
private transient Set<String> seenSymbols; // Track seen symbols to prevent duplicates
private transient GenericAppenderFactory appenderFactory;
private transient OutputFileFactory fileFactory;
private transient ZContext zmqContext;
private transient ZMQ.Socket pushSocket; // PUSH socket to job manager
// Batching state per exchange
private transient java.util.Map<String, DataWriter<Record>> writersByExchange;
private transient java.util.Map<String, Integer> countsPerExchange;
private transient java.util.Map<String, List<MarketWrapper>> pendingOutputPerExchange;
public SymbolMetadataWriter(TableLoader tableLoader, String notificationEndpoint) {
this.tableLoader = tableLoader;
this.notificationEndpoint = notificationEndpoint;
}
@Override
public void open(Configuration parameters) throws Exception {
tableLoader.open();
table = tableLoader.loadTable();
seenSymbols = new HashSet<>();
writersByExchange = new java.util.HashMap<>();
countsPerExchange = new java.util.HashMap<>();
pendingOutputPerExchange = new java.util.HashMap<>();
// Initialize ZMQ PUSH socket to job manager (mimics HistoryNotificationPublisher pattern)
zmqContext = new ZContext();
pushSocket = zmqContext.createSocket(SocketType.PUSH);
pushSocket.setLinger(1000);
pushSocket.setSndHWM(10000);
pushSocket.connect(notificationEndpoint);
appenderFactory = new GenericAppenderFactory(table.schema(), table.spec());
fileFactory = OutputFileFactory
.builderFor(table, getRuntimeContext().getIndexOfThisSubtask(), System.nanoTime())
.format(FileFormat.PARQUET)
.build();
LOG.info("SymbolMetadataWriter opened, table loaded: {}", table.name());
LOG.info("Connected PUSH socket to notification endpoint: {}", notificationEndpoint);
}
@Override
public void flatMap(MarketWrapper market, Collector<MarketWrapper> out) throws Exception {
// Create unique key for deduplication
String symbolKey = market.getExchangeId() + ":" + market.getMarketId();
// Skip if we've already seen this symbol
if (seenSymbols.contains(symbolKey)) {
LOG.debug("Skipping duplicate symbol: {}", symbolKey);
out.collect(market); // Still emit for downstream processing
return;
}
// Mark as seen
seenSymbols.add(symbolKey);
String exchangeId = market.getExchangeId();
// Create Iceberg record from Market protobuf
GenericRecord record = GenericRecord.create(table.schema());
record.setField("exchange_id", exchangeId);
record.setField("market_id", market.getMarketId());
record.setField("market_type", market.getMarketType());
record.setField("description", market.getDescription());
record.setField("base_asset", market.getBaseAsset());
record.setField("quote_asset", market.getQuoteAsset());
record.setField("tick_denom", market.getTickDenom());
record.setField("base_denom", market.getBaseDenom());
record.setField("quote_denom", market.getQuoteDenom());
// Convert supported_period_seconds to List<Integer>
List<Integer> supportedPeriods = new ArrayList<>(market.getSupportedPeriodSeconds());
record.setField("supported_period_seconds", supportedPeriods);
record.setField("earliest_time", market.getEarliestTime() != 0 ? market.getEarliestTime() : null);
record.setField("updated_at", System.currentTimeMillis() * 1000); // Current time in microseconds
// Get or create writer for this exchange
DataWriter<Record> writer = writersByExchange.get(exchangeId);
if (writer == null) {
// Compute partition key from exchange_id
GenericRecord partitionRecord = GenericRecord.create(table.schema());
partitionRecord.setField("exchange_id", exchangeId);
PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
partitionKey.partition(partitionRecord);
// Create new writer for this exchange's partition
EncryptedOutputFile encryptedFile = fileFactory.newOutputFile(partitionKey);
writer = appenderFactory.newDataWriter(encryptedFile, FileFormat.PARQUET, partitionKey);
writersByExchange.put(exchangeId, writer);
countsPerExchange.put(exchangeId, 0);
pendingOutputPerExchange.put(exchangeId, new ArrayList<>());
}
// Write record to batch
writer.write(record);
// Track count and pending output
int count = countsPerExchange.get(exchangeId) + 1;
countsPerExchange.put(exchangeId, count);
pendingOutputPerExchange.get(exchangeId).add(market);
// Flush batch if we've reached the batch size
if (count >= BATCH_SIZE) {
flushExchange(exchangeId, out);
}
}
private void flushExchange(String exchangeId, Collector<MarketWrapper> out) throws Exception {
DataWriter<Record> writer = writersByExchange.get(exchangeId);
if (writer == null) {
return;
}
try {
writer.close();
table.newAppend()
.appendFile(writer.toDataFile())
.commit();
int count = countsPerExchange.get(exchangeId);
LOG.info("Committed batch of {} symbols for exchange: {}", count, exchangeId);
// Send notification to gateways to reload symbol metadata
sendMetadataUpdateNotification(exchangeId, count);
// Emit all pending outputs
for (MarketWrapper market : pendingOutputPerExchange.get(exchangeId)) {
out.collect(market);
}
} finally {
// Clear state for this exchange
writersByExchange.remove(exchangeId);
countsPerExchange.remove(exchangeId);
pendingOutputPerExchange.remove(exchangeId);
}
}
private void sendMetadataUpdateNotification(String exchangeId, int count) {
if (pushSocket == null) {
LOG.warn("Push socket is null - cannot send METADATA_UPDATE notification for exchange: {} ({} symbols)", exchangeId, count);
return;
}
try {
// Topic for metadata updates (broadcast to all gateways)
String topic = "METADATA_UPDATE";
// Empty payload (notification only, no additional data needed)
byte[] payload = new byte[0];
// Build message frame: [msg_type][payload]
byte[] messageFrame = new byte[1 + payload.length];
messageFrame[0] = MSG_TYPE_SYMBOL_METADATA_UPDATED;
System.arraycopy(payload, 0, messageFrame, 1, payload.length);
// Send three-frame message via PUSH: [topic][version][message]
// Job manager's forwarder will republish via MARKET_DATA_PUB
pushSocket.sendMore(topic);
pushSocket.sendMore(new byte[]{PROTOCOL_VERSION});
pushSocket.send(messageFrame, 0);
LOG.info("Sent METADATA_UPDATE notification for exchange: {} ({} symbols)", exchangeId, count);
} catch (Exception e) {
LOG.error("Failed to send metadata update notification for exchange: {}", exchangeId, e);
// Don't throw - notification is best-effort
}
}
@Override
public void close() throws Exception {
// Flush any remaining batches
try {
for (String exchangeId : new ArrayList<>(writersByExchange.keySet())) {
DataWriter<Record> writer = writersByExchange.get(exchangeId);
if (writer != null) {
try {
writer.close();
table.newAppend()
.appendFile(writer.toDataFile())
.commit();
int count = countsPerExchange.get(exchangeId);
LOG.info("Final flush: committed {} remaining symbols for exchange: {}", count, exchangeId);
// Send final notification
sendMetadataUpdateNotification(exchangeId, count);
} catch (Exception e) {
LOG.error("Failed to flush remaining batch for exchange: {}", exchangeId, e);
}
}
}
} finally {
writersByExchange.clear();
countsPerExchange.clear();
pendingOutputPerExchange.clear();
}
// Close ZMQ resources
if (pushSocket != null) {
pushSocket.close();
}
if (zmqContext != null) {
zmqContext.close();
}
if (tableLoader != null) {
tableLoader.close();
}
}
}

View File

@@ -27,3 +27,12 @@ topics:
retention.ms: 2592000000 # 30 days
compression.type: snappy
cleanup.policy: delete
# Symbol metadata from ingestors
- name: symbol-metadata
partitions: 2
replication: 1
config:
retention.ms: 604800000 # 7 days
compression.type: snappy
cleanup.policy: compact # Keep latest per symbol key

View File

@@ -27,3 +27,12 @@ topics:
retention.ms: 2592000000 # 30 days
compression.type: snappy
cleanup.policy: delete
# Symbol metadata from ingestors
- name: symbol-metadata
partitions: 3
replication: 2
config:
retention.ms: 604800000 # 7 days
compression.type: snappy
cleanup.policy: compact # Keep latest per symbol key