backend redesign

This commit is contained in:
2026-03-11 18:47:11 -04:00
parent 8ff277c8c6
commit e99ef5d2dd
210 changed files with 12147 additions and 155 deletions

13
flink/.gitignore vendored Normal file
View File

@@ -0,0 +1,13 @@
target/
*.class
*.jar
*.war
*.ear
*.iml
.idea/
.vscode/
.settings/
.project
.classpath
dependency-reduced-pom.xml
protobuf/

37
flink/Dockerfile Normal file
View File

@@ -0,0 +1,37 @@
# Stage 1: Build the JAR
FROM maven:3.9-eclipse-temurin-11 AS builder
WORKDIR /build
# Copy pom.xml and protobuf definitions first for better caching
COPY pom.xml .
COPY protobuf ../protobuf/
# Download dependencies (cached if pom.xml doesn't change)
RUN mvn dependency:go-offline
# Copy source code
COPY src ./src
# Build the JAR
RUN mvn clean package -DskipTests
# For dev environment: replace topics.yaml with topics-dev.yaml in the JAR
# This avoids replication factor errors with only 1 Kafka broker
RUN mkdir -p /tmp/jar-overlay && \
cp /build/src/main/resources/topics-dev.yaml /tmp/jar-overlay/topics.yaml && \
cd /tmp/jar-overlay && \
jar uf /build/target/trading-flink-1.0-SNAPSHOT.jar topics.yaml
# Stage 2: Create the Flink runtime image
FROM flink:1.20.0-java11
# Copy the built JAR to the Flink lib directory
COPY --from=builder /build/target/trading-flink-1.0-SNAPSHOT.jar /opt/flink/usrlib/trading-flink.jar
# Copy configuration files
COPY config.example.yaml /opt/flink/conf/app-config.yaml
# Set the entrypoint to use Application mode
# The job will auto-submit on startup
USER flink

77
flink/README.md Normal file
View File

@@ -0,0 +1,77 @@
# Flink Deployment for K8s Cluster
## Install Flink Kubernetes Operator
```bash
# Add the Flink Helm repository
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/
helm repo update
# Install the operator
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator \
-f values.yaml \
--namespace flink --create-namespace
# Wait for operator to be ready
kubectl wait --for=condition=ready pod -l app.kubernetes.io/name=flink-kubernetes-operator -n flink --timeout=300s
```
## Create Service Account
```bash
kubectl create serviceaccount flink -n default
kubectl create clusterrolebinding flink-role-binding-default \
--clusterrole=edit \
--serviceaccount=default:flink
```
## Deploy Flink Cluster
```bash
# Apply the Flink cluster manifest
kubectl apply -f flink-cluster.yaml
# Check cluster status
kubectl get flinkdeployment -n default
# Check pods
kubectl get pods -n default | grep flink
```
## Access Flink Web UI
```bash
# Port forward to access the UI locally
kubectl port-forward svc/trading-flink-rest 8081:8081 -n default
# Open browser to http://localhost:8081
```
## Prometheus Metrics
Flink exposes metrics on port 9249. Prometheus will automatically discover and scrape these metrics via pod annotations:
- `prometheus.io/scrape: "true"`
- `prometheus.io/port: "9249"`
- `prometheus.io/path: "/metrics"`
To verify metrics are being exported:
```bash
kubectl exec -it <flink-jobmanager-pod> -n default -- curl localhost:9249/metrics
```
## Submit a Job
```bash
# Example: Submit a jar file
kubectl exec -it <jobmanager-pod> -- flink run /path/to/job.jar
```
## Uninstall
```bash
# Delete Flink cluster
kubectl delete flinkdeployment trading-flink -n default
# Delete operator
helm uninstall flink-kubernetes-operator -n flink
```

31
flink/config.example.yaml Normal file
View File

@@ -0,0 +1,31 @@
# Example configuration file
# This should be mounted at /etc/config/config.yaml in the Flink container
# ZeroMQ bind address and ports
zmq_bind_address: "tcp://*"
zmq_ingestor_work_queue_port: 5555
zmq_ingestor_response_port: 5556
zmq_ingestor_control_port: 5557
zmq_market_data_pub_port: 5558
zmq_client_request_port: 5559
zmq_cep_webhook_port: 5560
# Notification publisher endpoint (Flink → Relay)
# Relay connects XSUB to this endpoint and proxies to clients
notification_publish_endpoint: "tcp://*:5557"
# Kafka configuration
kafka_bootstrap_servers: "kafka:9092"
kafka_tick_topic: "market-tick"
kafka_ohlc_topic: "market-ohlc"
kafka_topics_file: "/topics-dev.yaml" # Use topics-dev.yaml for single broker dev environment
# Iceberg catalog
iceberg_catalog_uri: "http://iceberg-catalog:8181"
iceberg_warehouse: "s3://trading-warehouse/"
iceberg_namespace: "trading"
iceberg_table_prefix: "market"
# Flink configuration
flink_parallelism: 4
flink_checkpoint_interval_ms: 60000

42
flink/flink-cluster.yaml Normal file
View File

@@ -0,0 +1,42 @@
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: trading-flink
namespace: default
labels:
app: flink
spec:
podTemplate:
metadata:
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9249"
prometheus.io/path: "/metrics"
image: dexorder/ai-flink:latest
imagePullPolicy: Always
flinkVersion: v1_19
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
state.backend: filesystem
state.checkpoints.dir: file:///flink-data/checkpoints
state.savepoints.dir: file:///flink-data/savepoints
execution.checkpointing.interval: 60s
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: "9249"
serviceAccount: flink
jobManager:
resource:
memory: "1024Mi"
cpu: 0.5
replicas: 1
taskManager:
resource:
memory: "1024Mi"
cpu: 0.5
replicas: 1
job:
jarURI: local:///opt/flink/usrlib/trading-flink.jar
entryClass: com.dexorder.flink.TradingFlinkApp
parallelism: 1
upgradeMode: stateless
state: running

250
flink/pom.xml Normal file
View File

@@ -0,0 +1,250 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dexorder</groupId>
<artifactId>trading-flink</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.20.0</flink.version>
<iceberg.version>1.10.1</iceberg.version>
<java.version>11</java.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<jeromq.version>0.6.0</jeromq.version>
<protobuf.version>3.24.0</protobuf.version>
<snakeyaml.version>2.2</snakeyaml.version>
</properties>
<dependencies>
<!-- Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.3.0-1.20</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Kafka Admin Client -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
<!-- Iceberg dependencies -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-1.20</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<!-- iceberg-aws-bundle includes iceberg-aws + bundled AWS SDK v2 (optional deps in iceberg-aws are not pulled transitively) -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws-bundle</artifactId>
<version>${iceberg.version}</version>
</dependency>
<!-- Hadoop configuration -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>3.3.6</version>
</dependency>
<!-- ZeroMQ (JeroMQ - pure Java implementation) -->
<dependency>
<groupId>org.zeromq</groupId>
<artifactId>jeromq</artifactId>
<version>${jeromq.version}</version>
</dependency>
<!-- Protocol Buffers -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<!-- Gson for JSON serialization -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
<!-- YAML configuration -->
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>${snakeyaml.version}</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- Protocol Buffers Compiler -->
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<!-- Proto files are in ../protobuf directory -->
<protoSourceRoot>${project.basedir}/../protobuf</protoSourceRoot>
<!-- Generated Java classes go to target/generated-sources/protobuf/java -->
<outputDirectory>${project.build.directory}/generated-sources/protobuf/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- Add generated sources to build path -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.4.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.build.directory}/generated-sources/protobuf/java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<!-- Shade Plugin for fat jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.dexorder.flink.TradingFlinkApp</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!-- OS detector for protobuf -->
<plugin>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
<executions>
<execution>
<phase>initialize</phase>
<goals>
<goal>detect</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,8 @@
# Example secrets file
# This should be mounted at /etc/secrets/secrets.yaml in the Flink container
# Iceberg catalog credentials
iceberg_catalog_username: "admin"
iceberg_catalog_password: "changeme"
# Additional secrets as needed

View File

@@ -0,0 +1,253 @@
package com.dexorder.flink;
import com.dexorder.flink.config.AppConfig;
import com.dexorder.flink.iceberg.SchemaInitializer;
import com.dexorder.flink.ingestor.IngestorControlChannel;
import com.dexorder.flink.ingestor.IngestorWorkQueue;
import com.dexorder.flink.kafka.TopicManager;
import com.dexorder.flink.publisher.HistoryNotificationForwarder;
import com.dexorder.flink.publisher.HistoryNotificationFunction;
import com.dexorder.flink.publisher.OHLCBatchWrapper;
import com.dexorder.flink.publisher.OHLCBatchDeserializer;
import com.dexorder.flink.sink.HistoricalBatchWriter;
import com.dexorder.flink.zmq.ZmqChannelManager;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.TableLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* Main entry point for the Trading Flink application.
*
* Responsibilities:
* - Load configuration and secrets
* - Initialize ZMQ channels for ingestor communication, market data pub/sub, and client requests
* - Set up Kafka connectors for data ingestion
* - Bootstrap the Flink streaming job
*/
public class TradingFlinkApp {
private static final Logger LOG = LoggerFactory.getLogger(TradingFlinkApp.class);
private static final String DEFAULT_CONFIG_PATH = "/etc/config/config.yaml";
private static final String DEFAULT_SECRETS_PATH = "/etc/secrets/secrets.yaml";
public static void main(String[] args) throws Exception {
LOG.info("Starting Trading Flink Application");
// Load configuration
String configPath = System.getenv().getOrDefault("CONFIG_PATH", DEFAULT_CONFIG_PATH);
String secretsPath = System.getenv().getOrDefault("SECRETS_PATH", DEFAULT_SECRETS_PATH);
AppConfig config = loadConfig(configPath, secretsPath);
LOG.info("Configuration loaded successfully");
// Initialize Kafka topics
try (TopicManager topicManager = new TopicManager(config.getKafkaBootstrapServers())) {
topicManager.ensureTopicsExist();
LOG.info("Kafka topics initialized: {}", topicManager.getTopicNames());
} catch (Exception e) {
LOG.error("Failed to initialize Kafka topics", e);
throw e;
}
// Initialize Iceberg schemas
try {
Map<String, String> catalogProps = new HashMap<>();
catalogProps.put("type", "rest");
catalogProps.put("uri", config.getString("iceberg_catalog_uri", "http://iceberg-catalog:8181"));
catalogProps.put("warehouse", config.getString("iceberg_warehouse", "s3://warehouse/"));
// Configure S3 for MinIO
catalogProps.put("s3.endpoint", config.getString("s3_endpoint", "http://minio:9000"));
catalogProps.put("s3.path-style-access", "true");
catalogProps.put("client.region", "us-east-1");
// Use CatalogLoader.rest() for REST catalog instead of custom loader
CatalogLoader catalogLoader = CatalogLoader.rest(
"trading",
new Configuration(),
catalogProps
);
org.apache.iceberg.catalog.Catalog catalog = catalogLoader.loadCatalog();
try {
SchemaInitializer schemaInitializer = new SchemaInitializer(
catalog,
config.getIcebergNamespace()
);
schemaInitializer.initializeSchemas();
} finally {
if (catalog instanceof java.io.Closeable) {
((java.io.Closeable) catalog).close();
}
}
LOG.info("Iceberg schemas initialized");
} catch (Exception e) {
LOG.error("Failed to initialize Iceberg schemas", e);
throw e;
}
// Initialize ZeroMQ channels
ZmqChannelManager zmqManager = new ZmqChannelManager(config);
try {
zmqManager.initializeChannels();
LOG.info("ZeroMQ channels initialized");
// Initialize history notification forwarder (runs in job manager)
// Binds PULL socket to receive notifications from task managers, forwards to MARKET_DATA_PUB
HistoryNotificationForwarder notificationForwarder = new HistoryNotificationForwarder(
config.getNotificationPullPort(),
zmqManager.getSocket(ZmqChannelManager.Channel.MARKET_DATA_PUB)
);
notificationForwarder.start();
LOG.info("History notification forwarder started on port {}", config.getNotificationPullPort());
// Initialize ingestor components
IngestorWorkQueue workQueue = new IngestorWorkQueue(zmqManager);
IngestorControlChannel controlChannel = new IngestorControlChannel(zmqManager);
// Start the work queue processor
workQueue.start();
LOG.info("Ingestor work queue started");
// Set up Flink streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure Flink
configureFlinkEnvironment(env, config);
// Set up Kafka source for OHLCBatch data
KafkaSource<OHLCBatchWrapper> ohlcSource = KafkaSource.<OHLCBatchWrapper>builder()
.setBootstrapServers(config.getKafkaBootstrapServers())
.setTopics(config.getKafkaOhlcTopic())
.setGroupId("flink-ohlc-consumer")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new OHLCBatchDeserializer())
.build();
// Create OHLCBatch data stream
DataStream<OHLCBatchWrapper> ohlcStream = env
.fromSource(ohlcSource, WatermarkStrategy.noWatermarks(), "OHLCBatch Kafka Source");
LOG.info("OHLCBatch Kafka source configured");
// Set up Iceberg catalog and table loader
Map<String, String> catalogProps2 = new HashMap<>();
catalogProps2.put("type", "rest");
catalogProps2.put("uri", config.getString("iceberg_catalog_uri", "http://iceberg-catalog:8181"));
catalogProps2.put("warehouse", config.getString("iceberg_warehouse", "s3://warehouse/"));
// Configure S3 for MinIO
catalogProps2.put("s3.endpoint", config.getString("s3_endpoint", "http://minio:9000"));
catalogProps2.put("s3.path-style-access", "true");
catalogProps2.put("client.region", "us-east-1");
CatalogLoader catalogLoader2 = CatalogLoader.rest(
"trading",
new Configuration(),
catalogProps2
);
TableLoader tableLoader = TableLoader.fromCatalog(
catalogLoader2,
TableIdentifier.of(config.getIcebergNamespace(), "ohlc")
);
LOG.info("Iceberg table loader configured: {}.ohlc", config.getIcebergNamespace());
// Historical pipeline: write to Iceberg first, then notify.
// HistoricalBatchWriter uses direct catalog API (table.newAppend().commit()),
// which commits synchronously — no checkpoint dependency. Batches are emitted
// downstream only after commit returns, so notifications are guaranteed to fire
// after data is visible to readers.
// Parallelism MUST be 1: HistoryNotificationPublisher binds a ZMQ PUB socket,
// and only one instance can bind the same port.
DataStream<OHLCBatchWrapper> processedStream = ohlcStream
.flatMap(new HistoricalBatchWriter(tableLoader))
.setParallelism(1)
.process(new HistoryNotificationFunction(
config.getNotificationPublishEndpoint(),
config.getIcebergNamespace(),
config.getIcebergTablePrefix()
))
.setParallelism(1);
// Add a discard sink to force Flink to include the pipeline in the execution graph.
processedStream.addSink(new DiscardingSink<>()).setParallelism(1);
LOG.info("Historical pipeline configured: HistoricalBatchWriter -> HistoryNotificationFunction");
// TODO: Set up CEP patterns and triggers
// TODO: Set up realtime tick processing
LOG.info("Flink job configured, starting execution");
// Register shutdown hook for cleanup
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOG.info("Shutting down Trading Flink Application");
try {
// Send shutdown signal to ingestors
controlChannel.shutdown();
// Stop work queue
workQueue.stop();
// Stop notification forwarder
notificationForwarder.close();
// Close ZMQ channels
zmqManager.close();
LOG.info("Shutdown complete");
} catch (Exception e) {
LOG.error("Error during shutdown", e);
}
}));
// Execute the Flink job
env.execute("Trading Flink Application");
} catch (Exception e) {
LOG.error("Fatal error in Trading Flink Application", e);
zmqManager.close();
throw e;
}
}
private static AppConfig loadConfig(String configPath, String secretsPath) throws IOException {
LOG.info("Loading config from: {}", configPath);
LOG.info("Loading secrets from: {}", secretsPath);
try {
return new AppConfig(configPath, secretsPath);
} catch (IOException e) {
LOG.error("Failed to load configuration", e);
throw e;
}
}
private static void configureFlinkEnvironment(StreamExecutionEnvironment env, AppConfig config) {
// Set parallelism - defaults to 1; increase when more task slots are available
env.setParallelism(config.getInt("flink_parallelism", 1));
// Enable checkpointing for fault tolerance
env.enableCheckpointing(60000); // Checkpoint every 60 seconds
// Set time characteristic
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // Deprecated in newer Flink versions
LOG.info("Flink environment configured");
}
}

View File

@@ -0,0 +1,154 @@
package com.dexorder.flink.config;
import org.yaml.snakeyaml.Yaml;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
/**
* Application configuration loader.
* Loads config and secrets from YAML files mounted by Kubernetes.
*/
public class AppConfig {
private final Map<String, Object> config;
private final Map<String, Object> secrets;
public AppConfig(String configPath, String secretsPath) throws IOException {
this.config = loadYaml(configPath);
Map<String, Object> loadedSecrets;
try {
loadedSecrets = loadYaml(secretsPath);
} catch (IOException e) {
// Secrets are optional - use empty map if not found
loadedSecrets = new HashMap<>();
}
this.secrets = loadedSecrets;
}
private Map<String, Object> loadYaml(String path) throws IOException {
Yaml yaml = new Yaml();
try (InputStream inputStream = new FileInputStream(path)) {
Map<String, Object> data = yaml.load(inputStream);
return data != null ? data : new HashMap<>();
} catch (IOException e) {
throw new IOException("Failed to load YAML from: " + path, e);
}
}
public String getString(String key) {
return getString(key, null);
}
public String getString(String key, String defaultValue) {
Object value = config.get(key);
if (value != null) {
return value.toString();
}
value = secrets.get(key);
return value != null ? value.toString() : defaultValue;
}
public int getInt(String key) {
return getInt(key, 0);
}
public int getInt(String key, int defaultValue) {
Object value = config.get(key);
if (value == null) {
value = secrets.get(key);
}
if (value instanceof Number) {
return ((Number) value).intValue();
}
if (value instanceof String) {
try {
return Integer.parseInt((String) value);
} catch (NumberFormatException e) {
return defaultValue;
}
}
return defaultValue;
}
public boolean getBoolean(String key) {
return getBoolean(key, false);
}
public boolean getBoolean(String key, boolean defaultValue) {
Object value = config.get(key);
if (value == null) {
value = secrets.get(key);
}
if (value instanceof Boolean) {
return (Boolean) value;
}
if (value instanceof String) {
return Boolean.parseBoolean((String) value);
}
return defaultValue;
}
// ZMQ port getters
public int getIngestorWorkQueuePort() {
return getInt("zmq_ingestor_work_queue_port", 5555);
}
public int getIngestorResponsePort() {
return getInt("zmq_ingestor_response_port", 5556);
}
public int getIngestorControlPort() {
return getInt("zmq_ingestor_control_port", 5557);
}
public int getMarketDataPubPort() {
return getInt("zmq_market_data_pub_port", 5558);
}
public int getClientRequestPort() {
return getInt("zmq_client_request_port", 5559);
}
public int getCepWebhookPort() {
return getInt("zmq_cep_webhook_port", 5560);
}
public String getBindAddress() {
return getString("zmq_bind_address", "tcp://*");
}
// Kafka config
public String getKafkaBootstrapServers() {
return getString("kafka_bootstrap_servers", "localhost:9092");
}
public String getKafkaTickTopic() {
return getString("kafka_tick_topic", "market-tick");
}
public String getKafkaOhlcTopic() {
return getString("kafka_ohlc_topic", "market-ohlc");
}
// Notification config:
// Task managers PUSH notifications to this endpoint (job manager PULL address)
public String getNotificationPublishEndpoint() {
return getString("notification_publish_endpoint", "tcp://flink-jobmanager:5561");
}
// Job manager binds PULL on this port to receive notifications from task managers
public int getNotificationPullPort() {
return getInt("notification_pull_port", 5561);
}
// Iceberg config
public String getIcebergNamespace() {
return getString("iceberg_namespace", "trading");
}
public String getIcebergTablePrefix() {
return getString("iceberg_table_prefix", "market");
}
}

View File

@@ -0,0 +1,155 @@
package com.dexorder.flink.iceberg;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
/**
* Initializes Iceberg tables directly via the Iceberg Catalog API.
*
* Creates tables in Iceberg if they don't already exist. This ensures all tables
* are properly initialized before Flink starts writing data.
*/
public class SchemaInitializer {
private static final Logger LOG = LoggerFactory.getLogger(SchemaInitializer.class);
private final Catalog catalog;
private final String namespace;
public SchemaInitializer(Catalog catalog, String namespace) {
this.catalog = catalog;
this.namespace = namespace;
}
/**
* Initialize all schemas by creating tables from definitions.
*
* @throws IOException if initialization fails
*/
public void initializeSchemas() throws IOException {
LOG.info("Initializing Iceberg schemas in namespace: {}", namespace);
// Ensure namespace exists
ensureNamespaceExists();
// Initialize each table
initializeOhlcTable();
// Add more table initializations here as needed
LOG.info("Schema initialization completed successfully");
}
/**
* Ensure the namespace exists in the catalog.
*/
private void ensureNamespaceExists() {
Namespace ns = Namespace.of(namespace);
try {
if (catalog instanceof SupportsNamespaces) {
SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog;
if (!nsCatalog.namespaceExists(ns)) {
nsCatalog.createNamespace(ns);
LOG.info("Created namespace: {}", namespace);
} else {
LOG.info("Namespace already exists: {}", namespace);
}
} else {
LOG.warn("Catalog does not support namespaces, skipping namespace creation");
}
} catch (Exception e) {
LOG.error("Failed to create namespace: {}", namespace, e);
throw new RuntimeException("Namespace creation failed", e);
}
}
/**
* Initialize the OHLC table if it doesn't exist.
*/
// Bump this when the schema changes. Tables with a different (or missing) version
// will be dropped and recreated. Increment by 1 for each incompatible change.
private static final String OHLC_SCHEMA_VERSION = "1";
private static final String SCHEMA_VERSION_PROP = "app.schema.version";
private void initializeOhlcTable() {
TableIdentifier tableId = TableIdentifier.of(namespace, "ohlc");
try {
if (catalog.tableExists(tableId)) {
Table existing = catalog.loadTable(tableId);
String existingVersion = existing.properties().get(SCHEMA_VERSION_PROP);
if (!OHLC_SCHEMA_VERSION.equals(existingVersion)) {
LOG.warn("Table {} has schema version '{}', expected '{}' — manual migration required",
tableId, existingVersion, OHLC_SCHEMA_VERSION);
}
LOG.info("Table {} already exists at schema version {} — skipping creation", tableId, existingVersion);
return;
}
LOG.info("Creating OHLC table: {}", tableId);
// Define the OHLC schema.
// timestamp is stored as BIGINT (microseconds since epoch), not a TIMESTAMP type,
// so that GenericRowData.setField() accepts a plain Long value.
Schema schema = new Schema(
// Primary key fields
required(1, "ticker", Types.StringType.get(), "Market identifier (e.g., BINANCE:BTC/USDT)"),
required(2, "period_seconds", Types.IntegerType.get(), "OHLC period in seconds"),
required(3, "timestamp", Types.LongType.get(), "Candle timestamp in microseconds since epoch"),
// OHLC price data
required(4, "open", Types.LongType.get(), "Opening price"),
required(5, "high", Types.LongType.get(), "Highest price"),
required(6, "low", Types.LongType.get(), "Lowest price"),
required(7, "close", Types.LongType.get(), "Closing price"),
// Volume data
optional(8, "volume", Types.LongType.get(), "Total volume"),
optional(9, "buy_vol", Types.LongType.get(), "Buy volume"),
optional(10, "sell_vol", Types.LongType.get(), "Sell volume"),
// Timing data
optional(11, "open_time", Types.LongType.get(), "Timestamp when open price occurred"),
optional(12, "high_time", Types.LongType.get(), "Timestamp when high price occurred"),
optional(13, "low_time", Types.LongType.get(), "Timestamp when low price occurred"),
optional(14, "close_time", Types.LongType.get(), "Timestamp when close price occurred"),
// Additional fields
optional(15, "open_interest", Types.LongType.get(), "Open interest for futures"),
// Metadata fields
optional(16, "request_id", Types.StringType.get(), "Request ID that generated this data"),
required(17, "ingested_at", Types.LongType.get(), "Timestamp when data was ingested by Flink")
);
// Create the table with partitioning and properties
Table table = catalog.buildTable(tableId, schema)
.withPartitionSpec(org.apache.iceberg.PartitionSpec.builderFor(schema)
.identity("ticker")
.build())
.withProperty("write.format.default", "parquet")
.withProperty("write.parquet.compression-codec", "snappy")
.withProperty("write.metadata.compression-codec", "gzip")
.withProperty("format-version", "2")
.withProperty(SCHEMA_VERSION_PROP, OHLC_SCHEMA_VERSION)
.create();
LOG.info("Successfully created OHLC table: {}", tableId);
} catch (Exception e) {
LOG.error("Failed to initialize OHLC table: {}", tableId, e);
throw new RuntimeException("OHLC table initialization failed", e);
}
}
}

View File

@@ -0,0 +1,111 @@
package com.dexorder.flink.ingestor;
/**
* Represents a data request message to be sent to ingestors.
* This is a simplified wrapper until we generate the actual protobuf classes.
*/
public class DataRequestMessage {
private final String requestId;
private final RequestType requestType;
private final String ticker;
private final HistoricalParams historicalParams;
private final RealtimeParams realtimeParams;
public enum RequestType {
HISTORICAL_OHLC,
REALTIME_TICKS
}
public DataRequestMessage(String requestId, RequestType requestType, String ticker,
HistoricalParams historicalParams, RealtimeParams realtimeParams) {
this.requestId = requestId;
this.requestType = requestType;
this.ticker = ticker;
this.historicalParams = historicalParams;
this.realtimeParams = realtimeParams;
}
public String getRequestId() {
return requestId;
}
public RequestType getRequestType() {
return requestType;
}
public String getTicker() {
return ticker;
}
public HistoricalParams getHistoricalParams() {
return historicalParams;
}
public RealtimeParams getRealtimeParams() {
return realtimeParams;
}
/**
* Serialize to protobuf bytes.
* TODO: Replace with actual generated protobuf serialization
*/
public byte[] toProtobuf() {
// For now, return a placeholder
// This will be replaced with actual protobuf serialization once we compile the .proto files
return new byte[0];
}
public static class HistoricalParams {
private final long startTime;
private final long endTime;
private final int periodSeconds;
private final Integer limit;
public HistoricalParams(long startTime, long endTime, int periodSeconds, Integer limit) {
this.startTime = startTime;
this.endTime = endTime;
this.periodSeconds = periodSeconds;
this.limit = limit;
}
public long getStartTime() {
return startTime;
}
public long getEndTime() {
return endTime;
}
public int getPeriodSeconds() {
return periodSeconds;
}
public Integer getLimit() {
return limit;
}
}
public static class RealtimeParams {
private final boolean includeTicks;
private final boolean includeOhlc;
private final int[] ohlcPeriodSeconds;
public RealtimeParams(boolean includeTicks, boolean includeOhlc, int[] ohlcPeriodSeconds) {
this.includeTicks = includeTicks;
this.includeOhlc = includeOhlc;
this.ohlcPeriodSeconds = ohlcPeriodSeconds;
}
public boolean isIncludeTicks() {
return includeTicks;
}
public boolean isIncludeOhlc() {
return includeOhlc;
}
public int[] getOhlcPeriodSeconds() {
return ohlcPeriodSeconds;
}
}
}

View File

@@ -0,0 +1,91 @@
package com.dexorder.flink.ingestor;
import java.util.ArrayList;
import java.util.List;
/**
* Represents a DataResponse message from an ingestor.
* Contains the results of a historical data request.
*/
public class DataResponseMessage {
private final String requestId;
private final ResponseStatus status;
private final String errorMessage;
private final List<byte[]> ohlcData;
private final int totalRecords;
public enum ResponseStatus {
OK,
NOT_FOUND,
ERROR
}
public DataResponseMessage(String requestId, ResponseStatus status, String errorMessage,
List<byte[]> ohlcData, int totalRecords) {
this.requestId = requestId;
this.status = status;
this.errorMessage = errorMessage;
this.ohlcData = ohlcData != null ? ohlcData : new ArrayList<>();
this.totalRecords = totalRecords;
}
public String getRequestId() {
return requestId;
}
public ResponseStatus getStatus() {
return status;
}
public String getErrorMessage() {
return errorMessage;
}
public List<byte[]> getOhlcData() {
return ohlcData;
}
public int getTotalRecords() {
return totalRecords;
}
/**
* Deserialize from protobuf bytes.
* TODO: Replace with actual generated protobuf deserialization
*/
public static DataResponseMessage fromProtobuf(byte[] protobufData) {
// Placeholder - will be replaced with actual protobuf deserialization
// For now, return a dummy response
return new DataResponseMessage("", ResponseStatus.ERROR, "Not implemented", null, 0);
}
/**
* Serialize to protobuf bytes.
* TODO: Replace with actual generated protobuf serialization
*/
public byte[] toProtobuf() {
// Placeholder - will be replaced with actual protobuf serialization
return new byte[0];
}
/**
* Create a successful response.
*/
public static DataResponseMessage success(String requestId, List<byte[]> ohlcData) {
return new DataResponseMessage(requestId, ResponseStatus.OK, null, ohlcData, ohlcData.size());
}
/**
* Create an error response.
*/
public static DataResponseMessage error(String requestId, String errorMessage) {
return new DataResponseMessage(requestId, ResponseStatus.ERROR, errorMessage, null, 0);
}
/**
* Create a not found response.
*/
public static DataResponseMessage notFound(String requestId) {
return new DataResponseMessage(requestId, ResponseStatus.NOT_FOUND, "Data not found", null, 0);
}
}

View File

@@ -0,0 +1,165 @@
package com.dexorder.flink.ingestor;
import com.dexorder.flink.zmq.ZmqChannelManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Manages the ingestor control channel.
* Broadcasts control messages to all ingestor workers via ZMQ PUB socket.
*/
public class IngestorControlChannel {
private static final Logger LOG = LoggerFactory.getLogger(IngestorControlChannel.class);
private static final byte PROTOCOL_VERSION = 0x01;
private static final byte MSG_TYPE_INGESTOR_CONTROL = 0x02;
private final ZmqChannelManager zmqManager;
public IngestorControlChannel(ZmqChannelManager zmqManager) {
this.zmqManager = zmqManager;
}
/**
* Cancel a specific data request.
*/
public void cancelRequest(String requestId) {
IngestorControlMessage msg = IngestorControlMessage.cancel(requestId);
broadcastControlMessage(msg);
LOG.info("Sent CANCEL control message for request: {}", requestId);
}
/**
* Send shutdown signal to all ingestors.
*/
public void shutdown() {
IngestorControlMessage msg = IngestorControlMessage.shutdown();
broadcastControlMessage(msg);
LOG.info("Sent SHUTDOWN control message to all ingestors");
}
/**
* Update ingestor configuration.
*/
public void updateConfig(IngestorConfig config) {
IngestorControlMessage msg = IngestorControlMessage.configUpdate(config);
broadcastControlMessage(msg);
LOG.info("Sent CONFIG_UPDATE control message to all ingestors");
}
/**
* Send heartbeat to ingestors.
*/
public void sendHeartbeat() {
IngestorControlMessage msg = IngestorControlMessage.heartbeat();
broadcastControlMessage(msg);
LOG.debug("Sent HEARTBEAT control message to all ingestors");
}
/**
* Broadcast a control message to all ingestors.
*/
private void broadcastControlMessage(IngestorControlMessage message) {
try {
byte[] protobufData = message.toProtobuf();
boolean sent = zmqManager.sendMessage(
ZmqChannelManager.Channel.INGESTOR_CONTROL,
PROTOCOL_VERSION,
MSG_TYPE_INGESTOR_CONTROL,
protobufData
);
if (!sent) {
LOG.error("Failed to send control message: action={}", message.getAction());
}
} catch (Exception e) {
LOG.error("Error broadcasting control message: action={}", message.getAction(), e);
}
}
/**
* Control message wrapper.
*/
public static class IngestorControlMessage {
private final ControlAction action;
private final String requestId;
private final IngestorConfig config;
public enum ControlAction {
CANCEL,
SHUTDOWN,
CONFIG_UPDATE,
HEARTBEAT
}
private IngestorControlMessage(ControlAction action, String requestId, IngestorConfig config) {
this.action = action;
this.requestId = requestId;
this.config = config;
}
public static IngestorControlMessage cancel(String requestId) {
return new IngestorControlMessage(ControlAction.CANCEL, requestId, null);
}
public static IngestorControlMessage shutdown() {
return new IngestorControlMessage(ControlAction.SHUTDOWN, null, null);
}
public static IngestorControlMessage configUpdate(IngestorConfig config) {
return new IngestorControlMessage(ControlAction.CONFIG_UPDATE, null, config);
}
public static IngestorControlMessage heartbeat() {
return new IngestorControlMessage(ControlAction.HEARTBEAT, null, null);
}
public ControlAction getAction() {
return action;
}
public String getRequestId() {
return requestId;
}
public IngestorConfig getConfig() {
return config;
}
/**
* Serialize to protobuf bytes.
* TODO: Replace with actual generated protobuf serialization
*/
public byte[] toProtobuf() {
// Placeholder - will be replaced with actual protobuf serialization
return new byte[0];
}
}
/**
* Ingestor configuration.
*/
public static class IngestorConfig {
private final Integer maxConcurrent;
private final Integer timeoutSeconds;
private final String kafkaTopic;
public IngestorConfig(Integer maxConcurrent, Integer timeoutSeconds, String kafkaTopic) {
this.maxConcurrent = maxConcurrent;
this.timeoutSeconds = timeoutSeconds;
this.kafkaTopic = kafkaTopic;
}
public Integer getMaxConcurrent() {
return maxConcurrent;
}
public Integer getTimeoutSeconds() {
return timeoutSeconds;
}
public String getKafkaTopic() {
return kafkaTopic;
}
}
}

View File

@@ -0,0 +1,172 @@
package com.dexorder.flink.ingestor;
import com.dexorder.flink.zmq.ZmqChannelManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/**
* Listens for DataResponse messages from ingestors on the ROUTER socket.
* Matches responses to pending requests and delivers them to waiting handlers.
*/
public class IngestorResponseListener {
private static final Logger LOG = LoggerFactory.getLogger(IngestorResponseListener.class);
private static final byte PROTOCOL_VERSION = 0x01;
private static final byte MSG_TYPE_DATA_RESPONSE = 0x02;
private final ZmqChannelManager zmqManager;
private final Map<String, CompletableFuture<DataResponseMessage>> pendingRequests;
private volatile boolean running;
private Thread listenerThread;
public IngestorResponseListener(ZmqChannelManager zmqManager) {
this.zmqManager = zmqManager;
this.pendingRequests = new ConcurrentHashMap<>();
this.running = false;
}
/**
* Start the response listener thread.
*/
public void start() {
if (running) {
LOG.warn("IngestorResponseListener already running");
return;
}
running = true;
listenerThread = new Thread(this::listenLoop, "IngestorResponseListener-Thread");
listenerThread.setDaemon(false);
listenerThread.start();
LOG.info("IngestorResponseListener started");
}
/**
* Stop the response listener.
*/
public void stop() {
if (!running) {
return;
}
running = false;
if (listenerThread != null) {
listenerThread.interrupt();
try {
listenerThread.join(5000);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for listener thread to stop", e);
Thread.currentThread().interrupt();
}
}
// Cancel all pending requests
pendingRequests.values().forEach(future ->
future.completeExceptionally(new Exception("Listener stopped"))
);
pendingRequests.clear();
LOG.info("IngestorResponseListener stopped");
}
/**
* Register a request and return a CompletableFuture that will be completed
* when the response arrives.
*/
public CompletableFuture<DataResponseMessage> registerRequest(String requestId) {
CompletableFuture<DataResponseMessage> future = new CompletableFuture<>();
pendingRequests.put(requestId, future);
LOG.debug("Registered pending request: {}", requestId);
return future;
}
/**
* Cancel a pending request.
*/
public void cancelRequest(String requestId) {
CompletableFuture<DataResponseMessage> future = pendingRequests.remove(requestId);
if (future != null) {
future.completeExceptionally(new Exception("Request cancelled"));
LOG.debug("Cancelled pending request: {}", requestId);
}
}
/**
* Main listener loop - receives and processes DataResponse messages.
*/
private void listenLoop() {
LOG.info("IngestorResponseListener loop started");
while (running) {
try {
// Receive message from ROUTER socket with 1 second timeout
ZmqChannelManager.ReceivedMessage receivedMsg = zmqManager.receiveRouterMessage(
ZmqChannelManager.Channel.INGESTOR_RESPONSE,
1000
);
if (receivedMsg == null) {
continue;
}
// Verify protocol version and message type
if (receivedMsg.getVersion() != PROTOCOL_VERSION) {
LOG.warn("Received message with unsupported protocol version: {}",
receivedMsg.getVersion());
continue;
}
if (receivedMsg.getMessageType() != MSG_TYPE_DATA_RESPONSE) {
LOG.warn("Received unexpected message type: {}",
receivedMsg.getMessageType());
continue;
}
// Parse the DataResponse
DataResponseMessage response = DataResponseMessage.fromProtobuf(
receivedMsg.getProtobufData()
);
processResponse(response);
} catch (Exception e) {
if (running) {
LOG.error("Error in listener loop", e);
}
}
}
LOG.info("IngestorResponseListener loop stopped");
}
/**
* Process a received DataResponse message.
*/
private void processResponse(DataResponseMessage response) {
String requestId = response.getRequestId();
CompletableFuture<DataResponseMessage> future = pendingRequests.remove(requestId);
if (future == null) {
LOG.warn("Received response for unknown request: {}", requestId);
return;
}
LOG.info("Received response for request: {}, status={}, records={}",
requestId, response.getStatus(), response.getTotalRecords());
// Complete the future with the response
future.complete(response);
}
public boolean isRunning() {
return running;
}
public int getPendingRequestCount() {
return pendingRequests.size();
}
}

View File

@@ -0,0 +1,164 @@
package com.dexorder.flink.ingestor;
import com.dexorder.flink.zmq.ZmqChannelManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Manages the ingestor work queue.
* Sends DataRequest messages to ingestor workers via ZMQ PUB socket with exchange prefix filtering.
*/
public class IngestorWorkQueue {
private static final Logger LOG = LoggerFactory.getLogger(IngestorWorkQueue.class);
private static final byte PROTOCOL_VERSION = 0x01;
private static final byte MSG_TYPE_DATA_REQUEST = 0x01;
private final ZmqChannelManager zmqManager;
private final BlockingQueue<DataRequestMessage> requestQueue;
private volatile boolean running;
private Thread workerThread;
public IngestorWorkQueue(ZmqChannelManager zmqManager) {
this.zmqManager = zmqManager;
this.requestQueue = new LinkedBlockingQueue<>();
this.running = false;
}
/**
* Start the work queue processor thread.
*/
public void start() {
if (running) {
LOG.warn("IngestorWorkQueue already running");
return;
}
running = true;
workerThread = new Thread(this::processQueue, "IngestorWorkQueue-Thread");
workerThread.setDaemon(false);
workerThread.start();
LOG.info("IngestorWorkQueue started");
}
/**
* Stop the work queue processor.
*/
public void stop() {
if (!running) {
return;
}
running = false;
if (workerThread != null) {
workerThread.interrupt();
try {
workerThread.join(5000);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for worker thread to stop", e);
Thread.currentThread().interrupt();
}
}
LOG.info("IngestorWorkQueue stopped");
}
/**
* Submit a data request to the queue.
*/
public void submitRequest(DataRequestMessage request) {
if (!running) {
LOG.warn("Cannot submit request - work queue not running");
return;
}
try {
requestQueue.put(request);
LOG.debug("Submitted data request: {}", request.getRequestId());
} catch (InterruptedException e) {
LOG.error("Interrupted while submitting request", e);
Thread.currentThread().interrupt();
}
}
/**
* Process the request queue and send to ingestors.
*/
private void processQueue() {
LOG.info("IngestorWorkQueue processor started");
while (running) {
try {
DataRequestMessage request = requestQueue.take();
sendToIngestors(request);
} catch (InterruptedException e) {
if (running) {
LOG.error("Queue processing interrupted", e);
}
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
LOG.error("Error processing request", e);
}
}
LOG.info("IngestorWorkQueue processor stopped");
}
/**
* Send a data request to ingestors via PUB socket with exchange prefix.
* The topic prefix is extracted from the ticker (e.g., "BINANCE:BTC/USDT" -> "BINANCE:")
*/
private void sendToIngestors(DataRequestMessage request) {
try {
byte[] protobufData = request.toProtobuf();
// Extract exchange prefix from ticker (e.g., "BINANCE:BTC/USDT" -> "BINANCE:")
String ticker = request.getTicker();
String exchangePrefix = extractExchangePrefix(ticker);
boolean sent = zmqManager.sendTopicMessage(
ZmqChannelManager.Channel.INGESTOR_WORK_QUEUE,
exchangePrefix,
PROTOCOL_VERSION,
MSG_TYPE_DATA_REQUEST,
protobufData
);
if (sent) {
LOG.info("Sent DataRequest to ingestors: requestId={}, type={}, ticker={}, prefix={}",
request.getRequestId(), request.getRequestType(), request.getTicker(), exchangePrefix);
} else {
LOG.error("Failed to send DataRequest: {}", request.getRequestId());
// Re-queue the request
requestQueue.offer(request);
}
} catch (Exception e) {
LOG.error("Error sending request to ingestors: {}", request.getRequestId(), e);
// Re-queue the request
requestQueue.offer(request);
}
}
/**
* Extract exchange prefix from ticker string.
* E.g., "BINANCE:BTC/USDT" -> "BINANCE:"
*/
private String extractExchangePrefix(String ticker) {
int colonIndex = ticker.indexOf(':');
if (colonIndex > 0) {
return ticker.substring(0, colonIndex + 1);
}
LOG.warn("Ticker '{}' does not contain exchange prefix, using empty prefix", ticker);
return "";
}
public int getQueueSize() {
return requestQueue.size();
}
public boolean isRunning() {
return running;
}
}

View File

@@ -0,0 +1,60 @@
package com.dexorder.flink.kafka;
import java.util.HashMap;
import java.util.Map;
/**
* Configuration for a Kafka topic.
*/
public class TopicConfig {
private String name;
private int partitions;
private int replication;
private Map<String, String> config;
public TopicConfig() {
this.config = new HashMap<>();
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getPartitions() {
return partitions;
}
public void setPartitions(int partitions) {
this.partitions = partitions;
}
public int getReplication() {
return replication;
}
public void setReplication(int replication) {
this.replication = replication;
}
public Map<String, String> getConfig() {
return config;
}
public void setConfig(Map<String, String> config) {
this.config = config;
}
@Override
public String toString() {
return "TopicConfig{" +
"name='" + name + '\'' +
", partitions=" + partitions +
", replication=" + replication +
", config=" + config +
'}';
}
}

View File

@@ -0,0 +1,224 @@
package com.dexorder.flink.kafka;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.Constructor;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
/**
* Manages Kafka topics based on a YAML configuration file.
* Creates topics if they don't exist, updates configuration if topics exist.
*/
public class TopicManager implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(TopicManager.class);
private final AdminClient adminClient;
private final List<com.dexorder.flink.kafka.TopicConfig> topicConfigs;
/**
* Creates a TopicManager with the specified Kafka bootstrap servers.
* Loads topic configuration from the classpath resource "topics.yaml".
*
* @param bootstrapServers Kafka bootstrap servers
*/
public TopicManager(String bootstrapServers) {
this(bootstrapServers, "/topics.yaml");
}
/**
* Creates a TopicManager with the specified Kafka bootstrap servers and config file.
*
* @param bootstrapServers Kafka bootstrap servers
* @param configResourcePath Path to the topics YAML file in classpath
*/
public TopicManager(String bootstrapServers, String configResourcePath) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000");
props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10000");
this.adminClient = AdminClient.create(props);
this.topicConfigs = loadTopicConfigs(configResourcePath);
}
/**
* Loads topic configurations from a YAML file.
*/
private List<com.dexorder.flink.kafka.TopicConfig> loadTopicConfigs(String resourcePath) {
try (InputStream inputStream = getClass().getResourceAsStream(resourcePath)) {
if (inputStream == null) {
LOG.error("Topic configuration file not found: {}", resourcePath);
return Collections.emptyList();
}
Yaml yaml = new Yaml(new Constructor(TopicsWrapper.class, new org.yaml.snakeyaml.LoaderOptions()));
TopicsWrapper wrapper = yaml.load(inputStream);
if (wrapper == null || wrapper.getTopics() == null) {
LOG.warn("No topics defined in configuration file");
return Collections.emptyList();
}
LOG.info("Loaded {} topic configurations from {}", wrapper.getTopics().size(), resourcePath);
return wrapper.getTopics();
} catch (Exception e) {
LOG.error("Failed to load topic configurations from {}", resourcePath, e);
return Collections.emptyList();
}
}
/**
* Ensures all configured topics exist with the correct configuration.
* Creates topics that don't exist, logs warnings for topics that exist with different config.
*/
public void ensureTopicsExist() throws ExecutionException, InterruptedException {
if (topicConfigs.isEmpty()) {
LOG.warn("No topics to create");
return;
}
// Get existing topics
Set<String> existingTopics = adminClient.listTopics().names().get();
LOG.info("Found {} existing topics in Kafka", existingTopics.size());
// Separate topics to create vs topics to check
List<com.dexorder.flink.kafka.TopicConfig> topicsToCreate = topicConfigs.stream()
.filter(tc -> !existingTopics.contains(tc.getName()))
.collect(Collectors.toList());
List<com.dexorder.flink.kafka.TopicConfig> existingConfiguredTopics = topicConfigs.stream()
.filter(tc -> existingTopics.contains(tc.getName()))
.collect(Collectors.toList());
// Create new topics
if (!topicsToCreate.isEmpty()) {
createTopics(topicsToCreate);
}
// Verify existing topics
if (!existingConfiguredTopics.isEmpty()) {
verifyTopicConfigurations(existingConfiguredTopics);
}
LOG.info("Topic management complete");
}
/**
* Creates the specified topics.
*/
private void createTopics(List<com.dexorder.flink.kafka.TopicConfig> topics)
throws ExecutionException, InterruptedException {
List<NewTopic> newTopics = topics.stream()
.map(tc -> {
NewTopic newTopic = new NewTopic(
tc.getName(),
tc.getPartitions(),
(short) tc.getReplication()
);
if (tc.getConfig() != null && !tc.getConfig().isEmpty()) {
newTopic.configs(tc.getConfig());
}
return newTopic;
})
.collect(Collectors.toList());
LOG.info("Creating {} topics", newTopics.size());
CreateTopicsResult result = adminClient.createTopics(newTopics);
// Wait for all topics to be created and log results
for (Map.Entry<String, org.apache.kafka.common.KafkaFuture<Void>> entry : result.values().entrySet()) {
try {
entry.getValue().get();
LOG.info("Successfully created topic: {}", entry.getKey());
} catch (ExecutionException e) {
LOG.error("Failed to create topic: {}", entry.getKey(), e);
throw e;
}
}
}
/**
* Verifies that existing topics have the expected configuration.
* Logs warnings if configuration differs.
*/
private void verifyTopicConfigurations(List<com.dexorder.flink.kafka.TopicConfig> topics)
throws ExecutionException, InterruptedException {
List<String> topicNames = topics.stream()
.map(com.dexorder.flink.kafka.TopicConfig::getName)
.collect(Collectors.toList());
// Describe topics to get their configurations
DescribeTopicsResult describeResult = adminClient.describeTopics(topicNames);
Map<String, TopicDescription> descriptions = describeResult.all().get();
for (com.dexorder.flink.kafka.TopicConfig tc : topics) {
TopicDescription desc = descriptions.get(tc.getName());
if (desc == null) {
continue;
}
// Check partition count
if (desc.partitions().size() != tc.getPartitions()) {
LOG.warn("Topic {} has {} partitions, expected {}. " +
"Partition count cannot be changed automatically.",
tc.getName(), desc.partitions().size(), tc.getPartitions());
}
// Check replication factor
if (!desc.partitions().isEmpty()) {
int actualReplication = desc.partitions().get(0).replicas().size();
if (actualReplication != tc.getReplication()) {
LOG.warn("Topic {} has replication factor {}, expected {}. " +
"Replication factor cannot be changed automatically.",
tc.getName(), actualReplication, tc.getReplication());
}
}
LOG.info("Verified existing topic: {}", tc.getName());
}
}
/**
* Gets the list of configured topic names.
*/
public List<String> getTopicNames() {
return topicConfigs.stream()
.map(com.dexorder.flink.kafka.TopicConfig::getName)
.collect(Collectors.toList());
}
@Override
public void close() {
if (adminClient != null) {
adminClient.close();
}
}
/**
* Wrapper class for YAML deserialization.
*/
public static class TopicsWrapper {
private List<com.dexorder.flink.kafka.TopicConfig> topics;
public List<com.dexorder.flink.kafka.TopicConfig> getTopics() {
return topics;
}
public void setTopics(List<com.dexorder.flink.kafka.TopicConfig> topics) {
this.topics = topics;
}
}
}

View File

@@ -0,0 +1,100 @@
package com.dexorder.flink.publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
/**
* Runs in the job manager. Pulls notifications from task managers (via PUSH/PULL)
* and republishes them on the MARKET_DATA_PUB socket that the relay subscribes to.
*
* Flow:
* Task manager HistoryNotificationPublisher → PUSH
* ↓
* Job manager HistoryNotificationForwarder PULL → MARKET_DATA_PUB
* ↓
* Relay (XSUB) → Relay (XPUB) → Clients
*/
public class HistoryNotificationForwarder implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(HistoryNotificationForwarder.class);
private final ZMQ.Socket pullSocket;
private final ZMQ.Socket pubSocket;
private final ZContext context;
private volatile boolean running = true;
private Thread thread;
/**
* @param pullPort Port to bind PULL socket on (task managers connect PUSH here)
* @param pubSocket Existing MARKET_DATA_PUB socket from ZmqChannelManager
*/
public HistoryNotificationForwarder(int pullPort, ZMQ.Socket pubSocket) {
this.pubSocket = pubSocket;
this.context = new ZContext();
this.pullSocket = context.createSocket(SocketType.PULL);
this.pullSocket.setRcvHWM(10000);
String endpoint = "tcp://*:" + pullPort;
this.pullSocket.bind(endpoint);
LOG.info("HistoryNotificationForwarder PULL socket bound to {}", endpoint);
}
public void start() {
thread = new Thread(this::forwardLoop, "notification-forwarder");
thread.setDaemon(true);
thread.start();
LOG.info("HistoryNotificationForwarder started");
}
private void forwardLoop() {
LOG.info("Notification forwarder loop running");
pullSocket.setReceiveTimeOut(200); // ms, so we can check running flag
while (running) {
// Receive all frames of a multi-part message and forward to PUB
byte[] frame = pullSocket.recv(0);
if (frame == null) {
continue; // timeout, check running flag
}
boolean more = pullSocket.hasReceiveMore();
if (more) {
pubSocket.sendMore(frame);
} else {
pubSocket.send(frame, 0);
continue;
}
// Receive remaining frames
while (more) {
frame = pullSocket.recv(0);
more = pullSocket.hasReceiveMore();
if (more) {
pubSocket.sendMore(frame);
} else {
pubSocket.send(frame, 0);
}
}
LOG.debug("Forwarded notification to MARKET_DATA_PUB");
}
LOG.info("Notification forwarder loop stopped");
}
@Override
public void close() {
running = false;
if (thread != null) {
try {
thread.join(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
pullSocket.close();
context.close();
}
}

View File

@@ -0,0 +1,137 @@
package com.dexorder.flink.publisher;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Flink function that processes OHLCBatch messages and publishes notifications
* after data is written to Iceberg.
*
* This function:
* 1. Receives OHLCBatch messages with metadata and rows
* 2. Writes rows to Iceberg (pass through to sink)
* 3. Publishes HistoryReadyNotification immediately after batch completes
*
* Note: Each OHLCBatch is a complete unit - one batch = one notification
*/
public class HistoryNotificationFunction extends ProcessFunction<OHLCBatchWrapper, OHLCBatchWrapper> {
private static final Logger LOG = LoggerFactory.getLogger(HistoryNotificationFunction.class);
private final String notificationEndpoint;
private final String icebergNamespace;
private final String icebergTablePrefix;
private transient HistoryNotificationPublisher publisher;
public HistoryNotificationFunction(
String notificationEndpoint,
String icebergNamespace,
String icebergTablePrefix
) {
this.notificationEndpoint = notificationEndpoint;
this.icebergNamespace = icebergNamespace;
this.icebergTablePrefix = icebergTablePrefix;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Initialize ZMQ publisher
publisher = new HistoryNotificationPublisher(notificationEndpoint);
LOG.info("Initialized HistoryNotificationPublisher on {}", notificationEndpoint);
}
@Override
public void processElement(
OHLCBatchWrapper batch,
Context context,
Collector<OHLCBatchWrapper> out
) throws Exception {
// Pass through the batch for Iceberg sink
out.collect(batch);
String requestId = batch.getRequestId();
String clientId = batch.getClientId();
String ticker = batch.getTicker();
int periodSeconds = batch.getPeriodSeconds();
long startTime = batch.getStartTime();
long endTime = batch.getEndTime();
String status = batch.getStatus();
int rowCount = batch.getRowCount();
LOG.info("Processing OHLCBatch: request_id={}, status={}, rows={}",
requestId, status, rowCount);
// Determine Iceberg table name based on period
String tableName = getIcebergTableName(ticker, periodSeconds);
// Publish notification based on status
if ("ERROR".equals(status)) {
// Error during fetch
publisher.publishError(
requestId,
clientId,
ticker,
periodSeconds,
startTime,
endTime,
batch.getErrorMessage()
);
} else if ("NOT_FOUND".equals(status) || rowCount == 0) {
// No data available
publisher.publishNotFound(
requestId,
clientId,
ticker,
periodSeconds,
startTime,
endTime
);
} else {
// Success - data available
publisher.publishHistoryReady(
requestId,
clientId,
ticker,
periodSeconds,
startTime,
endTime,
icebergNamespace,
tableName,
rowCount
);
}
LOG.info("Published notification for request_id={}", requestId);
}
@Override
public void close() throws Exception {
super.close();
if (publisher != null) {
publisher.close();
}
}
private String getIcebergTableName(String ticker, int periodSeconds) {
// Extract exchange from ticker (e.g., "BINANCE:BTC/USDT" -> "binance")
String exchange = ticker.split(":")[0].toLowerCase();
// Convert period to human-readable format
String period;
if (periodSeconds < 3600) {
period = (periodSeconds / 60) + "m";
} else if (periodSeconds < 86400) {
period = (periodSeconds / 3600) + "h";
} else {
period = (periodSeconds / 86400) + "d";
}
return String.format("%s_ohlc_%s_%s", icebergTablePrefix, exchange, period);
}
}

View File

@@ -0,0 +1,130 @@
package com.dexorder.flink.publisher;
import com.dexorder.proto.HistoryReadyNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
/**
* Pushes HistoryReadyNotification protobuf messages to the job manager's notification forwarder.
*
* Uses ZMQ PUSH socket connecting to the job manager's PULL socket.
* The job manager's HistoryNotificationForwarder receives and republishes via MARKET_DATA_PUB.
* Topic format: "RESPONSE:{client_id}" or "HISTORY_READY:{request_id}"
*/
public class HistoryNotificationPublisher implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(HistoryNotificationPublisher.class);
private static final byte PROTOCOL_VERSION = 0x01;
private static final byte MSG_TYPE_HISTORY_READY = 0x12;
private final ZContext context;
private final ZMQ.Socket publishSocket;
public HistoryNotificationPublisher(String jobManagerPullEndpoint) {
this.context = new ZContext();
this.publishSocket = context.createSocket(SocketType.PUSH);
publishSocket.setLinger(1000);
publishSocket.setSndHWM(10000);
publishSocket.connect(jobManagerPullEndpoint);
LOG.info("HistoryNotificationPublisher connected PUSH to {}", jobManagerPullEndpoint);
}
public void publishHistoryReady(
String requestId,
String clientId,
String ticker,
int periodSeconds,
long startTime,
long endTime,
String icebergNamespace,
String icebergTable,
int rowCount
) {
String topic = clientId != null ? "RESPONSE:" + clientId : "HISTORY_READY:" + requestId;
HistoryReadyNotification notification = HistoryReadyNotification.newBuilder()
.setRequestId(requestId)
.setTicker(ticker)
.setPeriodSeconds(periodSeconds)
.setStartTime(startTime)
.setEndTime(endTime)
.setStatus(HistoryReadyNotification.NotificationStatus.OK)
.setIcebergNamespace(icebergNamespace)
.setIcebergTable(icebergTable)
.setRowCount(rowCount)
.setCompletedAt(System.currentTimeMillis() * 1000)
.build();
publish(topic, notification.toByteArray());
LOG.info("Published HistoryReadyNotification: topic={}, request_id={}, rows={}", topic, requestId, rowCount);
}
public void publishError(
String requestId,
String clientId,
String ticker,
int periodSeconds,
long startTime,
long endTime,
String errorMessage
) {
String topic = clientId != null ? "RESPONSE:" + clientId : "HISTORY_READY:" + requestId;
HistoryReadyNotification.Builder builder = HistoryReadyNotification.newBuilder()
.setRequestId(requestId)
.setTicker(ticker)
.setPeriodSeconds(periodSeconds)
.setStartTime(startTime)
.setEndTime(endTime)
.setStatus(HistoryReadyNotification.NotificationStatus.ERROR);
if (errorMessage != null) {
builder.setErrorMessage(errorMessage);
}
publish(topic, builder.build().toByteArray());
LOG.error("Published error notification: topic={}, request_id={}, error={}", topic, requestId, errorMessage);
}
public void publishNotFound(
String requestId,
String clientId,
String ticker,
int periodSeconds,
long startTime,
long endTime
) {
String topic = clientId != null ? "RESPONSE:" + clientId : "HISTORY_READY:" + requestId;
HistoryReadyNotification notification = HistoryReadyNotification.newBuilder()
.setRequestId(requestId)
.setTicker(ticker)
.setPeriodSeconds(periodSeconds)
.setStartTime(startTime)
.setEndTime(endTime)
.setStatus(HistoryReadyNotification.NotificationStatus.NOT_FOUND)
.build();
publish(topic, notification.toByteArray());
LOG.info("Published not-found notification: topic={}, request_id={}", topic, requestId);
}
private void publish(String topic, byte[] protoPayload) {
byte[] messageFrame = new byte[1 + protoPayload.length];
messageFrame[0] = MSG_TYPE_HISTORY_READY;
System.arraycopy(protoPayload, 0, messageFrame, 1, protoPayload.length);
publishSocket.sendMore(topic);
publishSocket.sendMore(new byte[]{PROTOCOL_VERSION});
publishSocket.send(messageFrame, 0);
}
@Override
public void close() {
if (publishSocket != null) publishSocket.close();
if (context != null) context.close();
}
}

View File

@@ -0,0 +1,103 @@
package com.dexorder.flink.publisher;
import com.dexorder.proto.OHLC;
import com.dexorder.proto.OHLCBatch;
import com.dexorder.proto.OHLCBatchMetadata;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Kafka deserializer for OHLCBatch protobuf messages.
* Handles messages from ingestors with metadata and OHLC rows.
*/
public class OHLCBatchDeserializer implements DeserializationSchema<OHLCBatchWrapper> {
private static final Logger LOG = LoggerFactory.getLogger(OHLCBatchDeserializer.class);
private static final long serialVersionUID = 1L;
private static final byte PROTOCOL_VERSION = 0x01;
private static final byte MSG_TYPE_OHLC_BATCH = 0x0B;
@Override
public OHLCBatchWrapper deserialize(byte[] message) throws IOException {
try {
if (message.length < 2) {
throw new IOException("Message too short: " + message.length + " bytes");
}
byte version = message[0];
if (version != PROTOCOL_VERSION) {
throw new IOException("Unsupported protocol version: " + version);
}
byte messageType = message[1];
if (messageType != MSG_TYPE_OHLC_BATCH) {
throw new IOException("Unexpected message type: 0x" + Integer.toHexString(messageType));
}
byte[] protoPayload = new byte[message.length - 2];
System.arraycopy(message, 2, protoPayload, 0, protoPayload.length);
OHLCBatchWrapper wrapper = parseOHLCBatch(protoPayload);
LOG.debug("Deserialized OHLCBatch: request_id={}, rows={}",
wrapper.getRequestId(), wrapper.getRowCount());
return wrapper;
} catch (Exception e) {
LOG.error("Failed to deserialize OHLCBatch", e);
throw new IOException("Failed to deserialize OHLCBatch", e);
}
}
private OHLCBatchWrapper parseOHLCBatch(byte[] payload) throws IOException {
OHLCBatch batch = OHLCBatch.parseFrom(payload);
OHLCBatchMetadata meta = batch.getMetadata();
List<OHLCBatchWrapper.OHLCRow> rows = new ArrayList<>(batch.getRowsCount());
for (OHLC row : batch.getRowsList()) {
rows.add(new OHLCBatchWrapper.OHLCRow(
row.getTimestamp(),
row.getTicker(),
row.getOpen(),
row.getHigh(),
row.getLow(),
row.getClose(),
row.hasVolume() ? row.getVolume() : 0
));
}
String status = meta.getStatus();
if (status == null || status.isEmpty()) {
status = "OK";
}
return new OHLCBatchWrapper(
meta.getRequestId(),
meta.hasClientId() ? meta.getClientId() : null,
meta.getTicker(),
meta.getPeriodSeconds(),
meta.getStartTime(),
meta.getEndTime(),
status,
meta.hasErrorMessage() ? meta.getErrorMessage() : null,
rows
);
}
@Override
public boolean isEndOfStream(OHLCBatchWrapper nextElement) {
return false;
}
@Override
public TypeInformation<OHLCBatchWrapper> getProducedType() {
return TypeInformation.of(OHLCBatchWrapper.class);
}
}

View File

@@ -0,0 +1,175 @@
package com.dexorder.flink.publisher;
import java.io.Serializable;
import java.util.List;
/**
* Wrapper for OHLCBatch protobuf message.
* Contains metadata and OHLC rows from ingestor.
*/
public class OHLCBatchWrapper implements Serializable {
private static final long serialVersionUID = 1L;
private final String requestId;
private final String clientId;
private final String ticker;
private final int periodSeconds;
private final long startTime;
private final long endTime;
private final String status; // OK, NOT_FOUND, ERROR
private final String errorMessage;
private final List<OHLCRow> rows;
public OHLCBatchWrapper(
String requestId,
String clientId,
String ticker,
int periodSeconds,
long startTime,
long endTime,
String status,
String errorMessage,
List<OHLCRow> rows
) {
this.requestId = requestId;
this.clientId = clientId;
this.ticker = ticker;
this.periodSeconds = periodSeconds;
this.startTime = startTime;
this.endTime = endTime;
this.status = status;
this.errorMessage = errorMessage;
this.rows = rows;
}
public String getRequestId() {
return requestId;
}
public String getClientId() {
return clientId;
}
public String getTicker() {
return ticker;
}
public int getPeriodSeconds() {
return periodSeconds;
}
public long getStartTime() {
return startTime;
}
public long getEndTime() {
return endTime;
}
public String getStatus() {
return status;
}
public String getErrorMessage() {
return errorMessage;
}
public List<OHLCRow> getRows() {
return rows;
}
public int getRowCount() {
return rows != null ? rows.size() : 0;
}
public boolean hasError() {
return "ERROR".equals(status);
}
public boolean isNotFound() {
return "NOT_FOUND".equals(status);
}
public boolean isOk() {
return "OK".equals(status);
}
@Override
public String toString() {
return "OHLCBatchWrapper{" +
"requestId='" + requestId + '\'' +
", clientId='" + clientId + '\'' +
", ticker='" + ticker + '\'' +
", periodSeconds=" + periodSeconds +
", status='" + status + '\'' +
", rowCount=" + getRowCount() +
'}';
}
/**
* Single OHLC row
*/
public static class OHLCRow implements Serializable {
private static final long serialVersionUID = 1L;
private final long timestamp;
private final String ticker;
private final long open;
private final long high;
private final long low;
private final long close;
private final long volume;
public OHLCRow(long timestamp, String ticker, long open, long high,
long low, long close, long volume) {
this.timestamp = timestamp;
this.ticker = ticker;
this.open = open;
this.high = high;
this.low = low;
this.close = close;
this.volume = volume;
}
public long getTimestamp() {
return timestamp;
}
public String getTicker() {
return ticker;
}
public long getOpen() {
return open;
}
public long getHigh() {
return high;
}
public long getLow() {
return low;
}
public long getClose() {
return close;
}
public long getVolume() {
return volume;
}
@Override
public String toString() {
return "OHLCRow{" +
"timestamp=" + timestamp +
", ticker='" + ticker + '\'' +
", open=" + open +
", high=" + high +
", low=" + low +
", close=" + close +
", volume=" + volume +
'}';
}
}
}

View File

@@ -0,0 +1,117 @@
package com.dexorder.flink.sink;
import com.dexorder.flink.publisher.OHLCBatchWrapper;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFileFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Writes historical OHLC batches directly to Iceberg using the catalog API.
*
* Unlike the streaming sink (FlinkSink), this uses table.newAppend().commit() which
* commits synchronously and immediately — no checkpoint dependency. Batches are emitted
* downstream only after the commit returns, guaranteeing HistoryNotificationFunction
* fires after data is visible to readers.
*/
public class HistoricalBatchWriter extends RichFlatMapFunction<OHLCBatchWrapper, OHLCBatchWrapper> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(HistoricalBatchWriter.class);
private final TableLoader tableLoader;
private transient Table table;
public HistoricalBatchWriter(TableLoader tableLoader) {
this.tableLoader = tableLoader;
}
@Override
public void open(Configuration parameters) throws Exception {
tableLoader.open();
table = tableLoader.loadTable();
LOG.info("HistoricalBatchWriter opened, table loaded: {}", table.name());
}
@Override
public void flatMap(OHLCBatchWrapper batch, Collector<OHLCBatchWrapper> out) throws Exception {
// Empty batches (NOT_FOUND/ERROR markers): emit immediately without writing
if (batch.getRows() == null || batch.getRows().isEmpty()) {
LOG.debug("Passing through empty batch (marker): request_id={}, status={}",
batch.getRequestId(), batch.getStatus());
out.collect(batch);
return;
}
GenericAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema(), table.spec());
OutputFileFactory fileFactory = OutputFileFactory
.builderFor(table, getRuntimeContext().getIndexOfThisSubtask(), System.nanoTime())
.format(FileFormat.PARQUET)
.build();
// Compute partition key from ticker (all rows in a batch share one ticker)
GenericRecord partitionRecord = GenericRecord.create(table.schema());
partitionRecord.setField("ticker", batch.getTicker());
PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
partitionKey.partition(partitionRecord);
// Write all rows to one data file
EncryptedOutputFile encryptedFile = fileFactory.newOutputFile(partitionKey);
DataWriter<Record> writer = appenderFactory.newDataWriter(
encryptedFile, FileFormat.PARQUET, partitionKey);
long ingestedAt = System.currentTimeMillis() * 1000;
try {
for (OHLCBatchWrapper.OHLCRow row : batch.getRows()) {
GenericRecord record = GenericRecord.create(table.schema());
record.setField("ticker", batch.getTicker());
record.setField("period_seconds", batch.getPeriodSeconds());
record.setField("timestamp", row.getTimestamp());
record.setField("open", row.getOpen());
record.setField("high", row.getHigh());
record.setField("low", row.getLow());
record.setField("close", row.getClose());
record.setField("volume", row.getVolume() != 0 ? row.getVolume() : null);
record.setField("buy_vol", null);
record.setField("sell_vol", null);
record.setField("open_time", null);
record.setField("high_time", null);
record.setField("low_time", null);
record.setField("close_time", null);
record.setField("open_interest", null);
record.setField("request_id", batch.getRequestId());
record.setField("ingested_at", ingestedAt);
writer.write(record);
}
} finally {
writer.close();
}
// Immediate commit — no checkpoint needed
table.newAppend()
.appendFile(writer.toDataFile())
.commit();
LOG.info("Committed {} rows to Iceberg for request_id={}", batch.getRowCount(), batch.getRequestId());
// Emit batch downstream only after successful commit
out.collect(batch);
}
@Override
public void close() throws Exception {
if (tableLoader != null) {
tableLoader.close();
}
}
}

View File

@@ -0,0 +1,121 @@
package com.dexorder.flink.sink;
import com.dexorder.flink.publisher.OHLCBatchWrapper;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Iceberg sink for OHLC data (Iceberg 1.10.1).
* Converts OHLCBatchWrapper to Flink RowData and writes to single Iceberg table.
*
* Deduplication Strategy:
* - Uses Flink's upsert mode with equality delete files
* - Natural key: (ticker, period_seconds, timestamp)
* - Last-write-wins semantics for duplicates
* - Copy-on-write mode for better query performance
*/
public class IcebergOHLCSink {
private static final Logger LOG = LoggerFactory.getLogger(IcebergOHLCSink.class);
/**
* Create an Iceberg sink for OHLC data with upsert behavior.
*
* @param stream Input stream of OHLCBatchWrapper
* @param tableLoader Iceberg table loader for trading.ohlc
* @return DataStream with sink applied
*/
public static DataStream<RowData> createSink(
DataStream<OHLCBatchWrapper> stream,
TableLoader tableLoader
) {
// Convert OHLCBatchWrapper to RowData
DataStream<RowData> rowStream = stream
.flatMap(new OHLCBatchToRowDataMapper())
.name("OHLCBatch to RowData");
// Apply Iceberg sink with upsert mode
// Upsert uses equality delete files to handle duplicates
// Natural key is (ticker, period_seconds, timestamp)
FlinkSink.forRowData(rowStream)
.tableLoader(tableLoader)
.upsert(true) // Enables equality delete file generation
.equalityFieldColumns(java.util.Arrays.asList("ticker", "period_seconds", "timestamp"))
.append();
LOG.info("Iceberg OHLC sink configured with upsert mode (equality deletes on ticker, period_seconds, timestamp)");
return rowStream;
}
/**
* Mapper that converts OHLCBatchWrapper to Flink RowData.
* Flattens the batch into individual rows for Iceberg.
*/
private static class OHLCBatchToRowDataMapper implements FlatMapFunction<OHLCBatchWrapper, RowData> {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(OHLCBatchWrapper batch, Collector<RowData> out) throws Exception {
// Skip empty batches (marker messages)
if (batch.getRows() == null || batch.getRows().isEmpty()) {
LOG.debug("Skipping empty batch (marker): request_id={}, status={}",
batch.getRequestId(), batch.getStatus());
return;
}
String requestId = batch.getRequestId();
String ticker = batch.getTicker();
int periodSeconds = batch.getPeriodSeconds();
long ingestedAt = System.currentTimeMillis() * 1000;
// Emit one RowData for each OHLC row in the batch
for (OHLCBatchWrapper.OHLCRow row : batch.getRows()) {
GenericRowData rowData = new GenericRowData(RowKind.INSERT, 17);
// Natural key fields (ticker, period_seconds, timestamp)
// Used by equality delete files for deduplication
rowData.setField(0, StringData.fromString(ticker));
rowData.setField(1, periodSeconds);
rowData.setField(2, row.getTimestamp());
// OHLC price data
rowData.setField(3, row.getOpen());
rowData.setField(4, row.getHigh());
rowData.setField(5, row.getLow());
rowData.setField(6, row.getClose());
// Volume data
rowData.setField(7, row.getVolume());
rowData.setField(8, null); // buy_vol (TODO: extract from protobuf)
rowData.setField(9, null); // sell_vol
// Timing data
rowData.setField(10, null); // open_time
rowData.setField(11, null); // high_time
rowData.setField(12, null); // low_time
rowData.setField(13, null); // close_time
// Additional fields
rowData.setField(14, null); // open_interest
// Metadata fields
rowData.setField(15, StringData.fromString(requestId));
rowData.setField(16, ingestedAt);
out.collect(rowData);
}
LOG.debug("Converted batch to {} RowData records: request_id={}",
batch.getRowCount(), requestId);
}
}
}

View File

@@ -0,0 +1,286 @@
package com.dexorder.flink.zmq;
import com.dexorder.flink.config.AppConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;
/**
* Manages all ZeroMQ channels for the Flink application.
* Each channel is bound to a specific port and socket type.
*/
public class ZmqChannelManager implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(ZmqChannelManager.class);
private final ZContext context;
private final Map<String, ZMQ.Socket> sockets;
private final AppConfig config;
public enum Channel {
INGESTOR_WORK_QUEUE,
INGESTOR_RESPONSE,
INGESTOR_CONTROL,
MARKET_DATA_PUB,
CLIENT_REQUEST,
CEP_WEBHOOK
}
public ZmqChannelManager(AppConfig config) {
this.config = config;
this.context = new ZContext();
this.sockets = new HashMap<>();
}
/**
* Initialize and bind all ZMQ channels.
*/
public void initializeChannels() {
String bindAddress = config.getBindAddress();
LOG.info("Initializing ZeroMQ channels on {}", bindAddress);
// 1. Ingestor Work Queue - PUB socket for topic-based work distribution (exchange prefix filtering)
createAndBind(
Channel.INGESTOR_WORK_QUEUE,
SocketType.PUB,
bindAddress + ":" + config.getIngestorWorkQueuePort(),
"Ingestor Work Queue (PUB)"
);
// 2. Ingestor Response - ROUTER socket for receiving historical data responses
createAndBind(
Channel.INGESTOR_RESPONSE,
SocketType.ROUTER,
bindAddress + ":" + config.getIngestorResponsePort(),
"Ingestor Response (ROUTER)"
);
// 3. Ingestor Control - PUB socket for broadcast control messages
createAndBind(
Channel.INGESTOR_CONTROL,
SocketType.PUB,
bindAddress + ":" + config.getIngestorControlPort(),
"Ingestor Control (PUB)"
);
// 4. Market Data Publication - PUB socket for market data streaming
createAndBind(
Channel.MARKET_DATA_PUB,
SocketType.PUB,
bindAddress + ":" + config.getMarketDataPubPort(),
"Market Data Publication (PUB)"
);
// 5. Client Request - REP socket for request-response
createAndBind(
Channel.CLIENT_REQUEST,
SocketType.REP,
bindAddress + ":" + config.getClientRequestPort(),
"Client Request (REP)"
);
// 6. CEP Webhook - ROUTER socket for async callbacks
createAndBind(
Channel.CEP_WEBHOOK,
SocketType.ROUTER,
bindAddress + ":" + config.getCepWebhookPort(),
"CEP Webhook (ROUTER)"
);
LOG.info("All ZeroMQ channels initialized successfully");
}
private void createAndBind(Channel channel, SocketType socketType, String endpoint, String description) {
try {
ZMQ.Socket socket = context.createSocket(socketType);
// Set socket options
socket.setLinger(1000); // 1 second linger on close
socket.setSndHWM(10000); // High water mark for outbound messages
socket.setRcvHWM(10000); // High water mark for inbound messages
// Bind the socket
socket.bind(endpoint);
sockets.put(channel.name(), socket);
LOG.info("Bound {} to {}", description, endpoint);
} catch (Exception e) {
LOG.error("Failed to bind {} to {}", description, endpoint, e);
throw new RuntimeException("Failed to initialize ZMQ channel: " + channel, e);
}
}
/**
* Get a socket by channel type.
*/
public ZMQ.Socket getSocket(Channel channel) {
ZMQ.Socket socket = sockets.get(channel.name());
if (socket == null) {
throw new IllegalStateException("Socket not initialized: " + channel);
}
return socket;
}
/**
* Send a message on the specified channel.
*
* @param channel The channel to send on
* @param versionByte Protocol version byte
* @param messageTypeByte Message type ID byte
* @param protobufData Serialized protobuf message
* @return true if sent successfully
*/
public boolean sendMessage(Channel channel, byte versionByte, byte messageTypeByte, byte[] protobufData) {
ZMQ.Socket socket = getSocket(channel);
// Send as two frames: [version byte] [type byte + protobuf data]
boolean sentFrame1 = socket.send(new byte[]{versionByte}, ZMQ.SNDMORE);
if (!sentFrame1) {
LOG.error("Failed to send version frame on channel {}", channel);
return false;
}
byte[] frame2 = new byte[1 + protobufData.length];
frame2[0] = messageTypeByte;
System.arraycopy(protobufData, 0, frame2, 1, protobufData.length);
boolean sentFrame2 = socket.send(frame2, 0);
if (!sentFrame2) {
LOG.error("Failed to send message frame on channel {}", channel);
return false;
}
return true;
}
/**
* Send a message with a topic prefix (for PUB sockets).
*
* @param channel The channel to send on
* @param topic Topic string for subscription filtering
* @param versionByte Protocol version byte
* @param messageTypeByte Message type ID byte
* @param protobufData Serialized protobuf message
* @return true if sent successfully
*/
public boolean sendTopicMessage(Channel channel, String topic, byte versionByte, byte messageTypeByte, byte[] protobufData) {
ZMQ.Socket socket = getSocket(channel);
// Send as three frames: [topic] [version byte] [type byte + protobuf data]
boolean sentTopic = socket.send(topic.getBytes(ZMQ.CHARSET), ZMQ.SNDMORE);
if (!sentTopic) {
LOG.error("Failed to send topic frame on channel {}", channel);
return false;
}
boolean sentFrame1 = socket.send(new byte[]{versionByte}, ZMQ.SNDMORE);
if (!sentFrame1) {
LOG.error("Failed to send version frame on channel {}", channel);
return false;
}
byte[] frame2 = new byte[1 + protobufData.length];
frame2[0] = messageTypeByte;
System.arraycopy(protobufData, 0, frame2, 1, protobufData.length);
boolean sentFrame2 = socket.send(frame2, 0);
if (!sentFrame2) {
LOG.error("Failed to send message frame on channel {}", channel);
return false;
}
return true;
}
/**
* Receive a message from a ROUTER socket.
* Returns a ReceivedMessage containing the identity, version, type, and payload.
*
* @param channel The channel to receive from (must be ROUTER)
* @param timeout Timeout in milliseconds (0 for non-blocking, -1 for blocking)
* @return ReceivedMessage or null if no message available
*/
public ReceivedMessage receiveRouterMessage(Channel channel, int timeout) {
ZMQ.Socket socket = getSocket(channel);
// Set receive timeout
if (timeout >= 0) {
socket.setReceiveTimeOut(timeout);
}
// Receive identity frame
byte[] identity = socket.recv(0);
if (identity == null) {
return null;
}
// Receive version frame
byte[] versionFrame = socket.recv(0);
if (versionFrame == null || versionFrame.length != 1) {
LOG.error("Invalid version frame received on channel {}", channel);
return null;
}
// Receive message frame (type byte + protobuf data)
byte[] messageFrame = socket.recv(0);
if (messageFrame == null || messageFrame.length < 1) {
LOG.error("Invalid message frame received on channel {}", channel);
return null;
}
byte versionByte = versionFrame[0];
byte messageTypeByte = messageFrame[0];
byte[] protobufData = new byte[messageFrame.length - 1];
System.arraycopy(messageFrame, 1, protobufData, 0, protobufData.length);
return new ReceivedMessage(identity, versionByte, messageTypeByte, protobufData);
}
/**
* Represents a received message from a ROUTER socket.
*/
public static class ReceivedMessage {
private final byte[] identity;
private final byte version;
private final byte messageType;
private final byte[] protobufData;
public ReceivedMessage(byte[] identity, byte version, byte messageType, byte[] protobufData) {
this.identity = identity;
this.version = version;
this.messageType = messageType;
this.protobufData = protobufData;
}
public byte[] getIdentity() {
return identity;
}
public byte getVersion() {
return version;
}
public byte getMessageType() {
return messageType;
}
public byte[] getProtobufData() {
return protobufData;
}
}
@Override
public void close() {
LOG.info("Closing ZeroMQ channels");
sockets.values().forEach(ZMQ.Socket::close);
sockets.clear();
context.close();
LOG.info("ZeroMQ context closed");
}
}

View File

@@ -0,0 +1,54 @@
-- Iceberg OHLC Table Schema (Documentation)
--
-- NOTE: This file is kept for documentation purposes only.
-- The actual table is created by SchemaInitializer.java using the Iceberg API.
--
-- Single table for all periods with Iceberg v2 primary key enforcement
-- Primary key: (ticker, period_seconds, timestamp)
-- Partition by: (ticker, days(timestamp))
CREATE TABLE IF NOT EXISTS trading.ohlc (
-- Primary key fields
ticker STRING NOT NULL COMMENT 'Market identifier (e.g., BINANCE:BTC/USDT)',
period_seconds INT NOT NULL COMMENT 'OHLC period in seconds (60, 300, 900, 3600, 14400, 86400, 604800, etc.)',
timestamp BIGINT NOT NULL COMMENT 'Candle timestamp in microseconds since epoch',
-- OHLC price data (stored as integers, divide by rational denominator from market metadata)
open BIGINT NOT NULL COMMENT 'Opening price',
high BIGINT NOT NULL COMMENT 'Highest price',
low BIGINT NOT NULL COMMENT 'Lowest price',
close BIGINT NOT NULL COMMENT 'Closing price',
-- Volume data
volume BIGINT COMMENT 'Total volume',
buy_vol BIGINT COMMENT 'Buy volume',
sell_vol BIGINT COMMENT 'Sell volume',
-- Timing data
open_time BIGINT COMMENT 'Timestamp when open price occurred',
high_time BIGINT COMMENT 'Timestamp when high price occurred',
low_time BIGINT COMMENT 'Timestamp when low price occurred',
close_time BIGINT COMMENT 'Timestamp when close price occurred',
-- Additional fields
open_interest BIGINT COMMENT 'Open interest for futures',
-- Metadata fields for tracking
request_id STRING COMMENT 'Request ID that generated this data (for historical requests)',
ingested_at BIGINT NOT NULL COMMENT 'Timestamp when data was ingested by Flink'
)
USING iceberg
PARTITIONED BY (ticker, days(timestamp))
TBLPROPERTIES (
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'snappy',
'write.metadata.compression-codec' = 'gzip',
'format-version' = '2',
'write.upsert.enabled' = 'true'
);
-- Primary key constraint (enforced by Iceberg v2)
-- Uniqueness enforced on (ticker, period_seconds, timestamp)
-- Upserts will replace existing rows with same primary key
COMMENT ON TABLE trading.ohlc IS 'Historical OHLC candle data from exchanges. Single table for all periods with primary key enforcement.';

View File

@@ -0,0 +1,29 @@
topics:
# Realtime and historical OHLC data (protobuf encoded)
# Individual OHLC messages for realtime data
# OHLCBatch messages for historical data (with metadata)
- name: market-ohlc
partitions: 3
replication: 1
config:
retention.ms: 86400000 # 24 hours
compression.type: snappy
cleanup.policy: delete
# Realtime tick data (protobuf encoded)
- name: market-tick
partitions: 3
replication: 1
config:
retention.ms: 3600000 # 1 hour
compression.type: snappy
cleanup.policy: delete
# Order execution events
- name: order-event
partitions: 2
replication: 1
config:
retention.ms: 2592000000 # 30 days
compression.type: snappy
cleanup.policy: delete

View File

@@ -0,0 +1,29 @@
topics:
# Realtime and historical OHLC data (protobuf encoded)
# Individual OHLC messages for realtime data
# OHLCBatch messages for historical data (with metadata)
- name: market-ohlc
partitions: 6
replication: 2
config:
retention.ms: 86400000 # 24 hours
compression.type: snappy
cleanup.policy: delete
# Realtime tick data (protobuf encoded)
- name: market-tick
partitions: 6
replication: 2
config:
retention.ms: 3600000 # 1 hour
compression.type: snappy
cleanup.policy: delete
# Order execution events
- name: order-event
partitions: 3
replication: 2
config:
retention.ms: 2592000000 # 30 days
compression.type: snappy
cleanup.policy: delete

8
flink/values.yaml Normal file
View File

@@ -0,0 +1,8 @@
# Strimzi Kafka Operator Helm Values
# Install with: helm install strimzi-kafka-operator oci://quay.io/strimzi-helm/strimzi-kafka-operator
# This values file is for the operator installation
#watchNamespaces: [] # Empty = watch all namespaces
watchNamespaces:
- default