prod alpha deploy

This commit is contained in:
2026-04-10 16:09:39 -04:00
parent 7d231169d9
commit 6418729b16
17 changed files with 375 additions and 967 deletions

View File

@@ -3,11 +3,7 @@
# ZeroMQ bind address and ports # ZeroMQ bind address and ports
zmq_bind_address: "tcp://*" zmq_bind_address: "tcp://*"
zmq_ingestor_work_queue_port: 5555 zmq_ingestor_work_queue_port: 5555
zmq_ingestor_response_port: 5556
zmq_ingestor_control_port: 5557
zmq_market_data_pub_port: 5558 zmq_market_data_pub_port: 5558
zmq_client_request_port: 5559
zmq_cep_webhook_port: 5560
# Notification endpoints # Notification endpoints
# Task managers PUSH to job manager PULL socket at this address # Task managers PUSH to job manager PULL socket at this address

View File

@@ -9,7 +9,6 @@ market_data_pub_port: 5558 # XPUB - Market data fanout to clients
# Ingestor-facing ports # Ingestor-facing ports
ingestor_work_port: 5555 # PUB - Distribute work with exchange prefix ingestor_work_port: 5555 # PUB - Distribute work with exchange prefix
ingestor_response_port: 5556 # ROUTER - Receive responses from ingestors
# Flink connection # Flink connection
flink_market_data_endpoint: "tcp://flink-jobmanager:5558" # XSUB - Subscribe to Flink market data (MARKET_DATA_PUB) flink_market_data_endpoint: "tcp://flink-jobmanager:5558" # XSUB - Subscribe to Flink market data (MARKET_DATA_PUB)

View File

@@ -3,15 +3,13 @@
# ZeroMQ bind address and ports # ZeroMQ bind address and ports
zmq_bind_address: "tcp://*" zmq_bind_address: "tcp://*"
zmq_ingestor_work_queue_port: 5555 zmq_ingestor_work_queue_port: 5555
zmq_ingestor_response_port: 5556
zmq_ingestor_control_port: 5557
zmq_market_data_pub_port: 5558 zmq_market_data_pub_port: 5558
zmq_client_request_port: 5559
zmq_cep_webhook_port: 5560
# Notification publisher endpoint (Flink → Relay) # Notification endpoints (internal Flink task manager → job manager path)
# Relay connects XSUB to this endpoint and proxies to clients # Task managers PUSH to job manager PULL socket at this address
notification_publish_endpoint: "tcp://*:5557" notification_publish_endpoint: "tcp://flink-jobmanager:5561"
# Job manager binds PULL socket on this port to receive from task managers
notification_pull_port: 5561
# Kafka configuration # Kafka configuration
kafka_bootstrap_servers: "kafka:9092" kafka_bootstrap_servers: "kafka:9092"
@@ -20,7 +18,7 @@ kafka_ohlc_topic: "market-ohlc"
# Iceberg catalog # Iceberg catalog
iceberg_catalog_uri: "http://iceberg-catalog:8181" iceberg_catalog_uri: "http://iceberg-catalog:8181"
iceberg_warehouse: "s3://trading-warehouse/" iceberg_warehouse: "s3://warehouse/"
iceberg_namespace: "trading" iceberg_namespace: "trading"
iceberg_table_prefix: "market" iceberg_table_prefix: "market"
hadoop_conf_dir: "/etc/hadoop/conf" hadoop_conf_dir: "/etc/hadoop/conf"

View File

@@ -9,7 +9,6 @@ market_data_pub_port: 5558 # XPUB - Market data fanout to clients
# Ingestor-facing ports # Ingestor-facing ports
ingestor_work_port: 5555 # PUB - Distribute work with exchange prefix ingestor_work_port: 5555 # PUB - Distribute work with exchange prefix
ingestor_response_port: 5556 # ROUTER - Receive responses from ingestors
# Flink connection # Flink connection
flink_market_data_endpoint: "tcp://flink-jobmanager:5558" # XSUB - Subscribe to Flink market data flink_market_data_endpoint: "tcp://flink-jobmanager:5558" # XSUB - Subscribe to Flink market data

View File

@@ -18,7 +18,8 @@ OHLC periods are represented as seconds.
* Ingestor receives request, fetches data from exchange * Ingestor receives request, fetches data from exchange
* Ingestor writes OHLC data to Kafka with __metadata in first record * Ingestor writes OHLC data to Kafka with __metadata in first record
* Flink reads from Kafka, processes data, writes to Iceberg * Flink reads from Kafka, processes data, writes to Iceberg
* Flink publishes HistoryReadyNotification to ZMQ PUB socket (port 5557) with deterministic topic * Flink task manager sends HistoryReadyNotification via PUSH to job manager PULL (port 5561)
* Job manager `HistoryNotificationForwarder` republishes on MARKET_DATA_PUB (port 5558)
* Relay proxies notification via XSUB → XPUB to clients * Relay proxies notification via XSUB → XPUB to clients
* Client receives notification (already subscribed) and queries Iceberg for data * Client receives notification (already subscribed) and queries Iceberg for data
@@ -33,7 +34,7 @@ OHLC periods are represented as seconds.
### Realtime Data Flow (Flink → Relay → Clients) ### Realtime Data Flow (Flink → Relay → Clients)
* Ingestors write realtime ticks to Kafka * Ingestors write realtime ticks to Kafka
* Flink reads from Kafka, processes OHLC aggregations, CEP triggers * Flink reads from Kafka, processes OHLC aggregations, CEP triggers
* Flink publishes market data via ZMQ PUB * Flink publishes market data via ZMQ PUB (port 5558)
* Relay subscribes to Flink (XSUB) and fanouts to clients (XPUB) * Relay subscribes to Flink (XSUB) and fanouts to clients (XPUB)
* Clients subscribe to specific tickers * Clients subscribe to specific tickers
@@ -97,7 +98,7 @@ All sockets bind on **Relay** (well-known endpoint). Components connect to relay
**Pattern**: XPUB/XSUB proxy **Pattern**: XPUB/XSUB proxy
- **Socket Type**: - **Socket Type**:
- Relay XPUB (bind) ← Clients SUB (connect) - Port 5558 - Relay XPUB (bind) ← Clients SUB (connect) - Port 5558
- Relay XSUB (connect) → Flink PUB (bind) - Port 5557 - Relay XSUB (connect) → Flink MARKET_DATA_PUB (bind) - Port 5558
- **Message Types**: `Tick`, `OHLC`, `HistoryReadyNotification`, `SymbolMetadataUpdated` - **Message Types**: `Tick`, `OHLC`, `HistoryReadyNotification`, `SymbolMetadataUpdated`
- **Topic Formats**: - **Topic Formats**:
- Market data: `{ticker}|{data_type}` (e.g., `BTC/USDT.BINANCE|tick`) - Market data: `{ticker}|{data_type}` (e.g., `BTC/USDT.BINANCE|tick`)
@@ -110,15 +111,13 @@ All sockets bind on **Relay** (well-known endpoint). Components connect to relay
- Relay proxies data to subscribed clients (stateless forwarding) - Relay proxies data to subscribed clients (stateless forwarding)
- Dynamic subscription management (no pre-registration) - Dynamic subscription management (no pre-registration)
### 4. Ingestor Control Channel (Optional - Future Use) **Internal Flink notification path (port 5561)**:
**Pattern**: PUB/SUB (Broadcast control) - Flink task managers send `HistoryReadyNotification` via PUSH to job manager PULL (port 5561)
- **Socket Type**: Relay uses PUB, Ingestors use SUB - `HistoryNotificationForwarder` (job manager) receives and republishes on MARKET_DATA_PUB (port 5558)
- **Endpoint**: `tcp://*:5557` (Relay binds) - This decouples task manager instances from direct pub/sub and handles multi-task-manager setups
- **Message Types**: `IngestorControl` (cancel, config updates)
- **Behavior**: ### 4. User Event Channels (User Containers → Gateway)
- Broadcast control messages to all ingestors See [user-events.md](user-events.md) for the full spec including ZMQ patterns, protobuf schemas, and delivery semantics for ports 5570 and 5571.
- Used for realtime subscription cancellation
- Configuration updates
## Message Envelope Format ## Message Envelope Format
@@ -160,315 +159,8 @@ The two-frame envelope is the **logical protocol format**, but physical transmis
| 0x11 | SubmitResponse | Immediate ack with notification topic | | 0x11 | SubmitResponse | Immediate ack with notification topic |
| 0x12 | HistoryReadyNotification | Notification that data is ready in Iceberg | | 0x12 | HistoryReadyNotification | Notification that data is ready in Iceberg |
| 0x13 | SymbolMetadataUpdated | Notification that symbol metadata refreshed | | 0x13 | SymbolMetadataUpdated | Notification that symbol metadata refreshed |
| 0x20 | UserEvent | Container → Gateway event (see user-events.md) |
## User Container Event System | 0x21 | EventAck | Gateway → Container acknowledgment |
User containers emit events (order executions, alerts, workspace changes) that must be delivered to users via their active session or external channels (Telegram, email, push). This requires two ZMQ patterns with different delivery guarantees.
### Event Flow Overview
```
┌─────────────────────────────────────────────────────────────┐
│ User Container │
│ │
│ Strategy/Indicator Engine │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Event Publisher │ │
│ │ │ │
│ │ 1. Check delivery spec │ │
│ │ 2. If INFORMATIONAL or has_active_subscriber(): │ │
│ │ → XPUB (fast path) │ │
│ │ 3. Else (CRITICAL or no active session): │ │
│ │ → DEALER (guaranteed delivery) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │ │
│ XPUB socket DEALER socket │
│ (port 5570) (port 5571) │
└─────────┼───────────────────────────┼───────────────────────┘
│ │
▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ Gateway Pool │
│ │
│ ┌──────────────────┐ ┌──────────────────────────┐ │
│ │ SUB socket │ │ ROUTER socket │ │
│ │ (per-session) │ │ (shared, any gateway) │ │
│ │ │ │ │ │
│ │ Subscribe to │ │ Pull event, deliver, │ │
│ │ USER:{user_id} │ │ send EventAck back │ │
│ │ on connect │ │ │ │
│ └────────┬─────────┘ └─────────────┬────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Active WS/ │ │ Telegram API / Email / │ │
│ │ Telegram │ │ Push Notification │ │
│ └─────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
```
### 5. User Event Channel - Informational (Container → Gateway)
**Pattern**: XPUB/SUB with subscription tracking
- **Socket Type**: Container uses XPUB (bind), Gateway uses SUB (connect)
- **Endpoint**: `tcp://*:5570` (Container binds)
- **Message Types**: `UserEvent`
- **Topic Format**: `USER:{user_id}` (e.g., `USER:user-abc123`)
- **Behavior**:
- Gateway subscribes to `USER:{user_id}` when user's WebSocket/Telegram session connects
- Gateway unsubscribes when session disconnects
- Container uses XPUB with `ZMQ_XPUB_VERBOSE` to track active subscriptions
- Container checks subscription set before publishing
- If no subscriber, message is either dropped (INFORMATIONAL) or routed to critical channel
- Zero coordination, fire-and-forget for active sessions
### 6. User Event Channel - Critical (Container → Gateway)
**Pattern**: DEALER/ROUTER with acknowledgment
- **Socket Type**: Container uses DEALER (connect), Gateway uses ROUTER (bind)
- **Endpoint**: `tcp://gateway:5571` (Gateway binds, containers connect)
- **Message Types**: `UserEvent``EventAck`
- **Behavior**:
- Container sends `UserEvent` with `event_id` via DEALER
- DEALER round-robins to available gateway ROUTER sockets
- Gateway processes event (sends to Telegram, email, etc.)
- Gateway sends `EventAck` back to container
- Container tracks pending events with timeout (30s default)
- On timeout without ack: resend (DEALER routes to next gateway)
- On container shutdown: persist pending to disk, reload on startup
- Provides at-least-once delivery guarantee
### Subscription Tracking (Container Side)
Container uses XPUB to detect active sessions:
```python
# Container event publisher initialization
xpub_socket = ctx.socket(zmq.XPUB)
xpub_socket.setsockopt(zmq.XPUB_VERBOSE, 1) # Receive all sub/unsub
xpub_socket.bind("tcp://*:5570")
active_subscriptions: set[str] = set()
# In event loop, handle subscription messages
def process_subscriptions():
while xpub_socket.poll(0):
msg = xpub_socket.recv()
topic = msg[1:].decode() # Skip first byte (sub/unsub flag)
if msg[0] == 1: # Subscribe
active_subscriptions.add(topic)
elif msg[0] == 0: # Unsubscribe
active_subscriptions.discard(topic)
def has_active_subscriber(user_id: str) -> bool:
return f"USER:{user_id}" in active_subscriptions
```
### Event Routing Logic (Container Side)
```python
def publish_event(event: UserEvent):
topic = f"USER:{event.user_id}"
if event.delivery.priority == Priority.INFORMATIONAL:
# Fire and forget - drop if nobody's listening
if has_active_subscriber(event.user_id):
xpub_socket.send_multipart([topic.encode(), serialize(event)])
# else: silently drop
elif has_active_subscriber(event.user_id):
# Active session exists - use fast path
xpub_socket.send_multipart([topic.encode(), serialize(event)])
else:
# No active session - use guaranteed delivery
send_via_dealer(event)
def send_via_dealer(event: UserEvent):
pending_events[event.event_id] = PendingEvent(
event=event,
sent_at=time.time(),
retries=0
)
dealer_socket.send(serialize(event))
```
### Message Type IDs (User Events)
| Type ID | Message Type | Description |
|---------|-----------------|------------------------------------------------|
| 0x20 | UserEvent | Container → Gateway event |
| 0x21 | EventAck | Gateway → Container acknowledgment |
### UserEvent Message
```protobuf
message UserEvent {
string user_id = 1;
string event_id = 2; // UUID for dedup/ack
int64 timestamp = 3; // Unix millis
EventType event_type = 4;
bytes payload = 5; // JSON or nested protobuf
DeliverySpec delivery = 6;
}
enum EventType {
ORDER_PLACED = 0;
ORDER_FILLED = 1;
ORDER_CANCELLED = 2;
ALERT_TRIGGERED = 3;
POSITION_UPDATED = 4;
WORKSPACE_CHANGED = 5;
STRATEGY_LOG = 6;
}
message DeliverySpec {
Priority priority = 1;
repeated ChannelPreference channels = 2; // Ordered preference list
}
enum Priority {
INFORMATIONAL = 0; // Drop if no active session
NORMAL = 1; // Best effort, short queue
CRITICAL = 2; // Must deliver, retry, escalate
}
message ChannelPreference {
ChannelType channel = 1;
bool only_if_active = 2; // true = skip if not connected
}
enum ChannelType {
ACTIVE_SESSION = 0; // Whatever's currently connected
WEB = 1;
TELEGRAM = 2;
EMAIL = 3;
PUSH = 4; // Mobile push notification
}
```
### EventAck Message
```protobuf
message EventAck {
string event_id = 1;
AckStatus status = 2;
string error_message = 3; // If status is ERROR
}
enum AckStatus {
DELIVERED = 0; // Successfully sent to at least one channel
QUEUED = 1; // Accepted, will retry (e.g., Telegram rate limit)
ERROR = 2; // Permanent failure
}
```
### Language Notes
- JavaScript protobufs will convert field names to camelCase.
- Python will retain snake_case.
### Delivery Examples
```python
# "Show on screen if they're watching, otherwise don't bother"
# → Uses XPUB path only, dropped if no subscriber
UserEvent(
delivery=DeliverySpec(
priority=Priority.INFORMATIONAL,
channels=[ChannelPreference(ChannelType.ACTIVE_SESSION, only_if_active=True)]
)
)
# "Active session preferred, fallback to Telegram"
# → Tries XPUB first (if subscribed), else DEALER for Telegram delivery
UserEvent(
delivery=DeliverySpec(
priority=Priority.NORMAL,
channels=[
ChannelPreference(ChannelType.ACTIVE_SESSION, only_if_active=True),
ChannelPreference(ChannelType.TELEGRAM, only_if_active=False),
]
)
)
# "Order executed - MUST get through"
# → Always uses DEALER path for guaranteed delivery
UserEvent(
delivery=DeliverySpec(
priority=Priority.CRITICAL,
channels=[
ChannelPreference(ChannelType.ACTIVE_SESSION, only_if_active=True),
ChannelPreference(ChannelType.TELEGRAM, only_if_active=False),
ChannelPreference(ChannelType.PUSH, only_if_active=False),
ChannelPreference(ChannelType.EMAIL, only_if_active=False),
]
)
)
```
### Gateway Event Processing
Gateway maintains:
1. **Session registry**: Maps user_id → active WebSocket/channel connections
2. **Channel credentials**: Telegram bot token, email service keys, push certificates
3. **SUB socket per user session**: Subscribes to `USER:{user_id}` on container's XPUB
4. **Shared ROUTER socket**: Receives critical events from any container
```typescript
// On user WebSocket connect
async onSessionConnect(userId: string, ws: WebSocket) {
// Subscribe to user's informational events
subSocket.subscribe(`USER:${userId}`);
sessions.set(userId, ws);
}
// On user WebSocket disconnect
async onSessionDisconnect(userId: string) {
subSocket.unsubscribe(`USER:${userId}`);
sessions.delete(userId);
}
// Handle informational events (from SUB socket)
subSocket.on('message', (topic, payload) => {
const event = deserialize(payload);
const ws = sessions.get(event.userId);
if (ws) {
ws.send(JSON.stringify({ type: 'event', ...event }));
}
});
// Handle critical events (from ROUTER socket)
routerSocket.on('message', (identity, payload) => {
const event = deserialize(payload);
deliverEvent(event).then(status => {
routerSocket.send([identity, serialize(EventAck(event.eventId, status))]);
});
});
async function deliverEvent(event: UserEvent): Promise<AckStatus> {
for (const pref of event.delivery.channels) {
if (pref.onlyIfActive && !sessions.has(event.userId)) continue;
switch (pref.channel) {
case ChannelType.ACTIVE_SESSION:
const ws = sessions.get(event.userId);
if (ws) { ws.send(...); return AckStatus.DELIVERED; }
break;
case ChannelType.TELEGRAM:
await telegramBot.sendMessage(event.userId, formatEvent(event));
return AckStatus.DELIVERED;
case ChannelType.EMAIL:
await emailService.send(event.userId, formatEvent(event));
return AckStatus.DELIVERED;
// ... etc
}
}
return AckStatus.ERROR;
}
```
## Error Handling ## Error Handling
@@ -476,56 +168,11 @@ async function deliverEvent(event: UserEvent): Promise<AckStatus> {
- Failed historical requests: ingestor writes error marker to Kafka - Failed historical requests: ingestor writes error marker to Kafka
- Flink reads error marker and publishes HistoryReadyNotification with ERROR status - Flink reads error marker and publishes HistoryReadyNotification with ERROR status
- Client timeout: if no notification received within timeout, assume failure - Client timeout: if no notification received within timeout, assume failure
- Realtime requests cancelled via control channel if ingestor fails
- REQ/REP timeouts: 30 seconds default for client request submission - REQ/REP timeouts: 30 seconds default for client request submission
- PUB/SUB has no delivery guarantees (Kafka provides durability) - PUB/SUB has no delivery guarantees (Kafka provides durability)
- No response routing needed - all notifications via topic-based pub/sub - No response routing needed - all notifications via topic-based pub/sub
**User Event Error Handling**:
- Informational events: dropped silently if no active session (by design)
- Critical events: container retries on ack timeout (30s default)
- Gateway tracks event_id for deduplication (5 minute window)
- If all channels fail: return ERROR ack, container may escalate or log
- Container persists pending critical events to disk on shutdown
**Durability**: **Durability**:
- All data flows through Kafka for durability - All data flows through Kafka for durability
- Flink checkpointing ensures exactly-once processing - Flink checkpointing ensures exactly-once processing
- Client can retry request with new request_id if notification not received - Client can retry request with new request_id if notification not received
- Critical user events use DEALER/ROUTER with ack for at-least-once delivery
## Scaling
### TODO: Flink-to-Relay ZMQ Discovery
Currently Relay connects to Flink via XSUB on a single endpoint. With multiple Flink instances behind a K8s service, we need many-to-many connectivity.
**Problem**: K8s service load balancing doesn't help ZMQ since connections are persistent. Relay needs to connect to ALL Flink instances to receive all published messages.
**Proposed Solution**: Use a K8s headless service for Flink workers:
```yaml
apiVersion: v1
kind: Service
metadata:
name: flink-workers
spec:
clusterIP: None
selector:
app: flink
```
Relay implementation:
1. On startup and periodically (every N seconds), resolve `flink-workers.namespace.svc.cluster.local`
2. DNS returns A records for all Flink pod IPs
3. Diff against current XSUB connections
4. Connect to new pods, disconnect from removed pods
**Alternative approaches considered**:
- XPUB/XSUB broker: Adds single point of failure and latency
- Service discovery (etcd/Redis): More complex, requires additional infrastructure
**Open questions**:
- Appropriate polling interval for DNS resolution (5-10 seconds?)
- Handling of brief disconnection during pod replacement
- Whether to use K8s Endpoints API watch instead of DNS polling for faster reaction

35
doc/scaling.md Normal file
View File

@@ -0,0 +1,35 @@
# Scaling Notes
## TODO: Flink-to-Relay ZMQ Discovery
Currently Relay connects to Flink via XSUB on a single endpoint. With multiple Flink instances behind a K8s service, we need many-to-many connectivity.
**Problem**: K8s service load balancing doesn't help ZMQ since connections are persistent. Relay needs to connect to ALL Flink instances to receive all published messages.
**Proposed Solution**: Use a K8s headless service for Flink workers:
```yaml
apiVersion: v1
kind: Service
metadata:
name: flink-workers
spec:
clusterIP: None
selector:
app: flink
```
Relay implementation:
1. On startup and periodically (every N seconds), resolve `flink-workers.namespace.svc.cluster.local`
2. DNS returns A records for all Flink pod IPs
3. Diff against current XSUB connections
4. Connect to new pods, disconnect from removed pods
**Alternative approaches considered**:
- XPUB/XSUB broker: Adds single point of failure and latency
- Service discovery (etcd/Redis): More complex, requires additional infrastructure
**Open questions**:
- Appropriate polling interval for DNS resolution (510 seconds?)
- Handling of brief disconnection during pod replacement
- Whether to use K8s Endpoints API watch instead of DNS polling for faster reaction

310
doc/user-events.md Normal file
View File

@@ -0,0 +1,310 @@
# User Event Protocol
User containers emit events (order executions, alerts, workspace changes) that must be delivered to users via their active session or external channels (Telegram, email, push). This requires two ZMQ patterns with different delivery guarantees.
## Event Flow Overview
```
┌─────────────────────────────────────────────────────────────┐
│ User Container │
│ │
│ Strategy/Indicator Engine │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Event Publisher │ │
│ │ │ │
│ │ 1. Check delivery spec │ │
│ │ 2. If INFORMATIONAL or has_active_subscriber(): │ │
│ │ → XPUB (fast path) │ │
│ │ 3. Else (CRITICAL or no active session): │ │
│ │ → DEALER (guaranteed delivery) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │ │
│ XPUB socket DEALER socket │
│ (port 5570) (port 5571) │
└─────────┼───────────────────────────┼───────────────────────┘
│ │
▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ Gateway Pool │
│ │
│ ┌──────────────────┐ ┌──────────────────────────┐ │
│ │ SUB socket │ │ ROUTER socket │ │
│ │ (per-session) │ │ (shared, any gateway) │ │
│ │ │ │ │ │
│ │ Subscribe to │ │ Pull event, deliver, │ │
│ │ USER:{user_id} │ │ send EventAck back │ │
│ │ on connect │ │ │ │
│ └────────┬─────────┘ └─────────────┬────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Active WS/ │ │ Telegram API / Email / │ │
│ │ Telegram │ │ Push Notification │ │
│ └─────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
```
## ZMQ Channels
### Channel 5 — Informational Events (Container → Gateway)
**Pattern**: XPUB/SUB with subscription tracking
- **Socket Type**: Container uses XPUB (bind), Gateway uses SUB (connect)
- **Endpoint**: `tcp://*:5570` (Container binds)
- **Message Types**: `UserEvent`
- **Topic Format**: `USER:{user_id}` (e.g., `USER:user-abc123`)
- **Behavior**:
- Gateway subscribes to `USER:{user_id}` when user's WebSocket/Telegram session connects
- Gateway unsubscribes when session disconnects
- Container uses XPUB with `ZMQ_XPUB_VERBOSE` to track active subscriptions
- Container checks subscription set before publishing
- If no subscriber, message is either dropped (INFORMATIONAL) or routed to critical channel
- Zero coordination, fire-and-forget for active sessions
### Channel 6 — Critical Events (Container → Gateway)
**Pattern**: DEALER/ROUTER with acknowledgment
- **Socket Type**: Container uses DEALER (connect), Gateway uses ROUTER (bind)
- **Endpoint**: `tcp://gateway:5571` (Gateway binds, containers connect)
- **Message Types**: `UserEvent``EventAck`
- **Behavior**:
- Container sends `UserEvent` with `event_id` via DEALER
- DEALER round-robins to available gateway ROUTER sockets
- Gateway processes event (sends to Telegram, email, etc.)
- Gateway sends `EventAck` back to container
- Container tracks pending events with timeout (30s default)
- On timeout without ack: resend (DEALER routes to next gateway)
- On container shutdown: persist pending to disk, reload on startup
- Provides at-least-once delivery guarantee
## Message Envelope
User event channels use the standard two-frame envelope (see `protocol.md`), with PUB/SUB adding a topic frame prefix:
```
Frame 1 (SUB only): [topic: "USER:{user_id}"]
Frame 2: [1 byte: protocol version]
Frame 3: [0x20 or 0x21][protobuf payload]
```
## Protobuf Schemas
### UserEvent (0x20)
```protobuf
message UserEvent {
string user_id = 1;
string event_id = 2; // UUID for dedup/ack
int64 timestamp = 3; // Unix millis
EventType event_type = 4;
bytes payload = 5; // JSON or nested protobuf
DeliverySpec delivery = 6;
}
enum EventType {
ORDER_PLACED = 0;
ORDER_FILLED = 1;
ORDER_CANCELLED = 2;
ALERT_TRIGGERED = 3;
POSITION_UPDATED = 4;
WORKSPACE_CHANGED = 5;
STRATEGY_LOG = 6;
}
message DeliverySpec {
Priority priority = 1;
repeated ChannelPreference channels = 2; // Ordered preference list
}
enum Priority {
INFORMATIONAL = 0; // Drop if no active session
NORMAL = 1; // Best effort, short queue
CRITICAL = 2; // Must deliver, retry, escalate
}
message ChannelPreference {
ChannelType channel = 1;
bool only_if_active = 2; // true = skip if not connected
}
enum ChannelType {
ACTIVE_SESSION = 0; // Whatever's currently connected
WEB = 1;
TELEGRAM = 2;
EMAIL = 3;
PUSH = 4; // Mobile push notification
}
```
### EventAck (0x21)
```protobuf
message EventAck {
string event_id = 1;
AckStatus status = 2;
string error_message = 3; // If status is ERROR
}
enum AckStatus {
DELIVERED = 0; // Successfully sent to at least one channel
QUEUED = 1; // Accepted, will retry (e.g., Telegram rate limit)
ERROR = 2; // Permanent failure
}
```
**Language note**: JavaScript protobufs convert field names to camelCase; Python retains snake_case.
## Subscription Tracking (Container Side)
Container uses XPUB to detect active sessions:
```python
xpub_socket = ctx.socket(zmq.XPUB)
xpub_socket.setsockopt(zmq.XPUB_VERBOSE, 1) # Receive all sub/unsub
xpub_socket.bind("tcp://*:5570")
active_subscriptions: set[str] = set()
def process_subscriptions():
while xpub_socket.poll(0):
msg = xpub_socket.recv()
topic = msg[1:].decode() # Skip first byte (sub/unsub flag)
if msg[0] == 1: # Subscribe
active_subscriptions.add(topic)
elif msg[0] == 0: # Unsubscribe
active_subscriptions.discard(topic)
def has_active_subscriber(user_id: str) -> bool:
return f"USER:{user_id}" in active_subscriptions
```
## Event Routing Logic (Container Side)
```python
def publish_event(event: UserEvent):
topic = f"USER:{event.user_id}"
if event.delivery.priority == Priority.INFORMATIONAL:
if has_active_subscriber(event.user_id):
xpub_socket.send_multipart([topic.encode(), serialize(event)])
# else: silently drop
elif has_active_subscriber(event.user_id):
# Active session exists — use fast path
xpub_socket.send_multipart([topic.encode(), serialize(event)])
else:
# No active session — use guaranteed delivery
send_via_dealer(event)
def send_via_dealer(event: UserEvent):
pending_events[event.event_id] = PendingEvent(
event=event,
sent_at=time.time(),
retries=0
)
dealer_socket.send(serialize(event))
```
## Delivery Examples
```python
# "Show on screen if they're watching, otherwise don't bother"
UserEvent(
delivery=DeliverySpec(
priority=Priority.INFORMATIONAL,
channels=[ChannelPreference(ChannelType.ACTIVE_SESSION, only_if_active=True)]
)
)
# "Active session preferred, fallback to Telegram"
UserEvent(
delivery=DeliverySpec(
priority=Priority.NORMAL,
channels=[
ChannelPreference(ChannelType.ACTIVE_SESSION, only_if_active=True),
ChannelPreference(ChannelType.TELEGRAM, only_if_active=False),
]
)
)
# "Order executed - MUST get through"
UserEvent(
delivery=DeliverySpec(
priority=Priority.CRITICAL,
channels=[
ChannelPreference(ChannelType.ACTIVE_SESSION, only_if_active=True),
ChannelPreference(ChannelType.TELEGRAM, only_if_active=False),
ChannelPreference(ChannelType.PUSH, only_if_active=False),
ChannelPreference(ChannelType.EMAIL, only_if_active=False),
]
)
)
```
## Gateway Event Processing
Gateway maintains:
1. **Session registry**: Maps user_id → active WebSocket/channel connections
2. **Channel credentials**: Telegram bot token, email service keys, push certificates
3. **SUB socket per user session**: Subscribes to `USER:{user_id}` on container's XPUB
4. **Shared ROUTER socket**: Receives critical events from any container
```typescript
async onSessionConnect(userId: string, ws: WebSocket) {
subSocket.subscribe(`USER:${userId}`);
sessions.set(userId, ws);
}
async onSessionDisconnect(userId: string) {
subSocket.unsubscribe(`USER:${userId}`);
sessions.delete(userId);
}
// Handle informational events (from SUB socket)
subSocket.on('message', (topic, payload) => {
const event = deserialize(payload);
const ws = sessions.get(event.userId);
if (ws) {
ws.send(JSON.stringify({ type: 'event', ...event }));
}
});
// Handle critical events (from ROUTER socket)
routerSocket.on('message', (identity, payload) => {
const event = deserialize(payload);
deliverEvent(event).then(status => {
routerSocket.send([identity, serialize(EventAck(event.eventId, status))]);
});
});
async function deliverEvent(event: UserEvent): Promise<AckStatus> {
for (const pref of event.delivery.channels) {
if (pref.onlyIfActive && !sessions.has(event.userId)) continue;
switch (pref.channel) {
case ChannelType.ACTIVE_SESSION:
const ws = sessions.get(event.userId);
if (ws) { ws.send(...); return AckStatus.DELIVERED; }
break;
case ChannelType.TELEGRAM:
await telegramBot.sendMessage(event.userId, formatEvent(event));
return AckStatus.DELIVERED;
case ChannelType.EMAIL:
await emailService.send(event.userId, formatEvent(event));
return AckStatus.DELIVERED;
}
}
return AckStatus.ERROR;
}
```
## Error Handling
- **Informational events**: dropped silently if no active session (by design)
- **Critical events**: container retries on ack timeout (30s default)
- **Deduplication**: gateway tracks `event_id` for 5 minutes to suppress retransmits
- **All channels fail**: return ERROR ack; container may escalate or log
- **Container shutdown**: persist pending critical events to disk, reload on startup

View File

@@ -4,15 +4,13 @@
# ZeroMQ bind address and ports # ZeroMQ bind address and ports
zmq_bind_address: "tcp://*" zmq_bind_address: "tcp://*"
zmq_ingestor_work_queue_port: 5555 zmq_ingestor_work_queue_port: 5555
zmq_ingestor_response_port: 5556
zmq_ingestor_control_port: 5557
zmq_market_data_pub_port: 5558 zmq_market_data_pub_port: 5558
zmq_client_request_port: 5559
zmq_cep_webhook_port: 5560
# Notification publisher endpoint (Flink → Relay) # Notification endpoints (internal Flink task manager → job manager path)
# Relay connects XSUB to this endpoint and proxies to clients # Task managers PUSH to job manager PULL socket at this address
notification_publish_endpoint: "tcp://*:5557" notification_publish_endpoint: "tcp://flink-jobmanager:5561"
# Job manager binds PULL socket on this port to receive from task managers
notification_pull_port: 5561
# Kafka configuration # Kafka configuration
kafka_bootstrap_servers: "kafka:9092" kafka_bootstrap_servers: "kafka:9092"

View File

@@ -2,7 +2,6 @@ package com.dexorder.flink;
import com.dexorder.flink.config.AppConfig; import com.dexorder.flink.config.AppConfig;
import com.dexorder.flink.iceberg.SchemaInitializer; import com.dexorder.flink.iceberg.SchemaInitializer;
import com.dexorder.flink.ingestor.IngestorControlChannel;
import com.dexorder.flink.ingestor.IngestorWorkQueue; import com.dexorder.flink.ingestor.IngestorWorkQueue;
import com.dexorder.flink.kafka.TopicManager; import com.dexorder.flink.kafka.TopicManager;
import com.dexorder.flink.publisher.HistoryNotificationForwarder; import com.dexorder.flink.publisher.HistoryNotificationForwarder;
@@ -117,11 +116,8 @@ public class TradingFlinkApp {
notificationForwarder.start(); notificationForwarder.start();
LOG.info("History notification forwarder started on port {}", config.getNotificationPullPort()); LOG.info("History notification forwarder started on port {}", config.getNotificationPullPort());
// Initialize ingestor components // Initialize ingestor work queue
IngestorWorkQueue workQueue = new IngestorWorkQueue(zmqManager); IngestorWorkQueue workQueue = new IngestorWorkQueue(zmqManager);
IngestorControlChannel controlChannel = new IngestorControlChannel(zmqManager);
// Start the work queue processor
workQueue.start(); workQueue.start();
LOG.info("Ingestor work queue started"); LOG.info("Ingestor work queue started");
@@ -237,9 +233,6 @@ public class TradingFlinkApp {
Runtime.getRuntime().addShutdownHook(new Thread(() -> { Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOG.info("Shutting down Trading Flink Application"); LOG.info("Shutting down Trading Flink Application");
try { try {
// Send shutdown signal to ingestors
controlChannel.shutdown();
// Stop work queue // Stop work queue
workQueue.stop(); workQueue.stop();

View File

@@ -95,26 +95,10 @@ public class AppConfig {
return getInt("zmq_ingestor_work_queue_port", 5555); return getInt("zmq_ingestor_work_queue_port", 5555);
} }
public int getIngestorResponsePort() {
return getInt("zmq_ingestor_response_port", 5556);
}
public int getIngestorControlPort() {
return getInt("zmq_ingestor_control_port", 5557);
}
public int getMarketDataPubPort() { public int getMarketDataPubPort() {
return getInt("zmq_market_data_pub_port", 5558); return getInt("zmq_market_data_pub_port", 5558);
} }
public int getClientRequestPort() {
return getInt("zmq_client_request_port", 5559);
}
public int getCepWebhookPort() {
return getInt("zmq_cep_webhook_port", 5560);
}
public String getBindAddress() { public String getBindAddress() {
return getString("zmq_bind_address", "tcp://*"); return getString("zmq_bind_address", "tcp://*");
} }

View File

@@ -81,7 +81,7 @@ public class SchemaInitializer {
// Bump this when the schema changes. Tables with a different (or missing) version // Bump this when the schema changes. Tables with a different (or missing) version
// will be dropped and recreated. Increment by 1 for each incompatible change. // will be dropped and recreated. Increment by 1 for each incompatible change.
// v1: open/high/low/close required; ingestor forward-fills interior gaps with previous close // v1: open/high/low/close required; ingestor forward-fills interior gaps with previous close
private static final String OHLC_SCHEMA_VERSION = "5"; private static final String OHLC_SCHEMA_VERSION = "1";
private static final String SCHEMA_VERSION_PROP = "app.schema.version"; private static final String SCHEMA_VERSION_PROP = "app.schema.version";
private void initializeOhlcTable() { private void initializeOhlcTable() {
@@ -179,7 +179,7 @@ public class SchemaInitializer {
// v2: removed tick_denom/base_denom/quote_denom; added Nautilus instrument fields // v2: removed tick_denom/base_denom/quote_denom; added Nautilus instrument fields
// (price_precision, size_precision, tick_size, lot_size, min_notional, // (price_precision, size_precision, tick_size, lot_size, min_notional,
// margin_init, margin_maint, maker_fee, taker_fee, contract_multiplier) // margin_init, margin_maint, maker_fee, taker_fee, contract_multiplier)
private static final String SYMBOL_METADATA_SCHEMA_VERSION = "5"; private static final String SYMBOL_METADATA_SCHEMA_VERSION = "1";
private void initializeSymbolMetadataTable() { private void initializeSymbolMetadataTable() {
TableIdentifier tableId = TableIdentifier.of(namespace, "symbol_metadata"); TableIdentifier tableId = TableIdentifier.of(namespace, "symbol_metadata");

View File

@@ -1,91 +0,0 @@
package com.dexorder.flink.ingestor;
import java.util.ArrayList;
import java.util.List;
/**
* Represents a DataResponse message from an ingestor.
* Contains the results of a historical data request.
*/
public class DataResponseMessage {
private final String requestId;
private final ResponseStatus status;
private final String errorMessage;
private final List<byte[]> ohlcData;
private final int totalRecords;
public enum ResponseStatus {
OK,
NOT_FOUND,
ERROR
}
public DataResponseMessage(String requestId, ResponseStatus status, String errorMessage,
List<byte[]> ohlcData, int totalRecords) {
this.requestId = requestId;
this.status = status;
this.errorMessage = errorMessage;
this.ohlcData = ohlcData != null ? ohlcData : new ArrayList<>();
this.totalRecords = totalRecords;
}
public String getRequestId() {
return requestId;
}
public ResponseStatus getStatus() {
return status;
}
public String getErrorMessage() {
return errorMessage;
}
public List<byte[]> getOhlcData() {
return ohlcData;
}
public int getTotalRecords() {
return totalRecords;
}
/**
* Deserialize from protobuf bytes.
* TODO: Replace with actual generated protobuf deserialization
*/
public static DataResponseMessage fromProtobuf(byte[] protobufData) {
// Placeholder - will be replaced with actual protobuf deserialization
// For now, return a dummy response
return new DataResponseMessage("", ResponseStatus.ERROR, "Not implemented", null, 0);
}
/**
* Serialize to protobuf bytes.
* TODO: Replace with actual generated protobuf serialization
*/
public byte[] toProtobuf() {
// Placeholder - will be replaced with actual protobuf serialization
return new byte[0];
}
/**
* Create a successful response.
*/
public static DataResponseMessage success(String requestId, List<byte[]> ohlcData) {
return new DataResponseMessage(requestId, ResponseStatus.OK, null, ohlcData, ohlcData.size());
}
/**
* Create an error response.
*/
public static DataResponseMessage error(String requestId, String errorMessage) {
return new DataResponseMessage(requestId, ResponseStatus.ERROR, errorMessage, null, 0);
}
/**
* Create a not found response.
*/
public static DataResponseMessage notFound(String requestId) {
return new DataResponseMessage(requestId, ResponseStatus.NOT_FOUND, "Data not found", null, 0);
}
}

View File

@@ -1,165 +0,0 @@
package com.dexorder.flink.ingestor;
import com.dexorder.flink.zmq.ZmqChannelManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Manages the ingestor control channel.
* Broadcasts control messages to all ingestor workers via ZMQ PUB socket.
*/
public class IngestorControlChannel {
private static final Logger LOG = LoggerFactory.getLogger(IngestorControlChannel.class);
private static final byte PROTOCOL_VERSION = 0x01;
private static final byte MSG_TYPE_INGESTOR_CONTROL = 0x02;
private final ZmqChannelManager zmqManager;
public IngestorControlChannel(ZmqChannelManager zmqManager) {
this.zmqManager = zmqManager;
}
/**
* Cancel a specific data request.
*/
public void cancelRequest(String requestId) {
IngestorControlMessage msg = IngestorControlMessage.cancel(requestId);
broadcastControlMessage(msg);
LOG.info("Sent CANCEL control message for request: {}", requestId);
}
/**
* Send shutdown signal to all ingestors.
*/
public void shutdown() {
IngestorControlMessage msg = IngestorControlMessage.shutdown();
broadcastControlMessage(msg);
LOG.info("Sent SHUTDOWN control message to all ingestors");
}
/**
* Update ingestor configuration.
*/
public void updateConfig(IngestorConfig config) {
IngestorControlMessage msg = IngestorControlMessage.configUpdate(config);
broadcastControlMessage(msg);
LOG.info("Sent CONFIG_UPDATE control message to all ingestors");
}
/**
* Send heartbeat to ingestors.
*/
public void sendHeartbeat() {
IngestorControlMessage msg = IngestorControlMessage.heartbeat();
broadcastControlMessage(msg);
LOG.debug("Sent HEARTBEAT control message to all ingestors");
}
/**
* Broadcast a control message to all ingestors.
*/
private void broadcastControlMessage(IngestorControlMessage message) {
try {
byte[] protobufData = message.toProtobuf();
boolean sent = zmqManager.sendMessage(
ZmqChannelManager.Channel.INGESTOR_CONTROL,
PROTOCOL_VERSION,
MSG_TYPE_INGESTOR_CONTROL,
protobufData
);
if (!sent) {
LOG.error("Failed to send control message: action={}", message.getAction());
}
} catch (Exception e) {
LOG.error("Error broadcasting control message: action={}", message.getAction(), e);
}
}
/**
* Control message wrapper.
*/
public static class IngestorControlMessage {
private final ControlAction action;
private final String requestId;
private final IngestorConfig config;
public enum ControlAction {
CANCEL,
SHUTDOWN,
CONFIG_UPDATE,
HEARTBEAT
}
private IngestorControlMessage(ControlAction action, String requestId, IngestorConfig config) {
this.action = action;
this.requestId = requestId;
this.config = config;
}
public static IngestorControlMessage cancel(String requestId) {
return new IngestorControlMessage(ControlAction.CANCEL, requestId, null);
}
public static IngestorControlMessage shutdown() {
return new IngestorControlMessage(ControlAction.SHUTDOWN, null, null);
}
public static IngestorControlMessage configUpdate(IngestorConfig config) {
return new IngestorControlMessage(ControlAction.CONFIG_UPDATE, null, config);
}
public static IngestorControlMessage heartbeat() {
return new IngestorControlMessage(ControlAction.HEARTBEAT, null, null);
}
public ControlAction getAction() {
return action;
}
public String getRequestId() {
return requestId;
}
public IngestorConfig getConfig() {
return config;
}
/**
* Serialize to protobuf bytes.
* TODO: Replace with actual generated protobuf serialization
*/
public byte[] toProtobuf() {
// Placeholder - will be replaced with actual protobuf serialization
return new byte[0];
}
}
/**
* Ingestor configuration.
*/
public static class IngestorConfig {
private final Integer maxConcurrent;
private final Integer timeoutSeconds;
private final String kafkaTopic;
public IngestorConfig(Integer maxConcurrent, Integer timeoutSeconds, String kafkaTopic) {
this.maxConcurrent = maxConcurrent;
this.timeoutSeconds = timeoutSeconds;
this.kafkaTopic = kafkaTopic;
}
public Integer getMaxConcurrent() {
return maxConcurrent;
}
public Integer getTimeoutSeconds() {
return timeoutSeconds;
}
public String getKafkaTopic() {
return kafkaTopic;
}
}
}

View File

@@ -1,172 +0,0 @@
package com.dexorder.flink.ingestor;
import com.dexorder.flink.zmq.ZmqChannelManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/**
* Listens for DataResponse messages from ingestors on the ROUTER socket.
* Matches responses to pending requests and delivers them to waiting handlers.
*/
public class IngestorResponseListener {
private static final Logger LOG = LoggerFactory.getLogger(IngestorResponseListener.class);
private static final byte PROTOCOL_VERSION = 0x01;
private static final byte MSG_TYPE_DATA_RESPONSE = 0x02;
private final ZmqChannelManager zmqManager;
private final Map<String, CompletableFuture<DataResponseMessage>> pendingRequests;
private volatile boolean running;
private Thread listenerThread;
public IngestorResponseListener(ZmqChannelManager zmqManager) {
this.zmqManager = zmqManager;
this.pendingRequests = new ConcurrentHashMap<>();
this.running = false;
}
/**
* Start the response listener thread.
*/
public void start() {
if (running) {
LOG.warn("IngestorResponseListener already running");
return;
}
running = true;
listenerThread = new Thread(this::listenLoop, "IngestorResponseListener-Thread");
listenerThread.setDaemon(false);
listenerThread.start();
LOG.info("IngestorResponseListener started");
}
/**
* Stop the response listener.
*/
public void stop() {
if (!running) {
return;
}
running = false;
if (listenerThread != null) {
listenerThread.interrupt();
try {
listenerThread.join(5000);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for listener thread to stop", e);
Thread.currentThread().interrupt();
}
}
// Cancel all pending requests
pendingRequests.values().forEach(future ->
future.completeExceptionally(new Exception("Listener stopped"))
);
pendingRequests.clear();
LOG.info("IngestorResponseListener stopped");
}
/**
* Register a request and return a CompletableFuture that will be completed
* when the response arrives.
*/
public CompletableFuture<DataResponseMessage> registerRequest(String requestId) {
CompletableFuture<DataResponseMessage> future = new CompletableFuture<>();
pendingRequests.put(requestId, future);
LOG.debug("Registered pending request: {}", requestId);
return future;
}
/**
* Cancel a pending request.
*/
public void cancelRequest(String requestId) {
CompletableFuture<DataResponseMessage> future = pendingRequests.remove(requestId);
if (future != null) {
future.completeExceptionally(new Exception("Request cancelled"));
LOG.debug("Cancelled pending request: {}", requestId);
}
}
/**
* Main listener loop - receives and processes DataResponse messages.
*/
private void listenLoop() {
LOG.info("IngestorResponseListener loop started");
while (running) {
try {
// Receive message from ROUTER socket with 1 second timeout
ZmqChannelManager.ReceivedMessage receivedMsg = zmqManager.receiveRouterMessage(
ZmqChannelManager.Channel.INGESTOR_RESPONSE,
1000
);
if (receivedMsg == null) {
continue;
}
// Verify protocol version and message type
if (receivedMsg.getVersion() != PROTOCOL_VERSION) {
LOG.warn("Received message with unsupported protocol version: {}",
receivedMsg.getVersion());
continue;
}
if (receivedMsg.getMessageType() != MSG_TYPE_DATA_RESPONSE) {
LOG.warn("Received unexpected message type: {}",
receivedMsg.getMessageType());
continue;
}
// Parse the DataResponse
DataResponseMessage response = DataResponseMessage.fromProtobuf(
receivedMsg.getProtobufData()
);
processResponse(response);
} catch (Exception e) {
if (running) {
LOG.error("Error in listener loop", e);
}
}
}
LOG.info("IngestorResponseListener loop stopped");
}
/**
* Process a received DataResponse message.
*/
private void processResponse(DataResponseMessage response) {
String requestId = response.getRequestId();
CompletableFuture<DataResponseMessage> future = pendingRequests.remove(requestId);
if (future == null) {
LOG.warn("Received response for unknown request: {}", requestId);
return;
}
LOG.info("Received response for request: {}, status={}, records={}",
requestId, response.getStatus(), response.getTotalRecords());
// Complete the future with the response
future.complete(response);
}
public boolean isRunning() {
return running;
}
public int getPendingRequestCount() {
return pendingRequests.size();
}
}

View File

@@ -24,11 +24,7 @@ public class ZmqChannelManager implements Closeable {
public enum Channel { public enum Channel {
INGESTOR_WORK_QUEUE, INGESTOR_WORK_QUEUE,
INGESTOR_RESPONSE,
INGESTOR_CONTROL,
MARKET_DATA_PUB, MARKET_DATA_PUB,
CLIENT_REQUEST,
CEP_WEBHOOK
} }
public ZmqChannelManager(AppConfig config) { public ZmqChannelManager(AppConfig config) {
@@ -53,23 +49,7 @@ public class ZmqChannelManager implements Closeable {
"Ingestor Work Queue (PUB)" "Ingestor Work Queue (PUB)"
); );
// 2. Ingestor Response - ROUTER socket for receiving historical data responses // 2. Market Data Publication - PUB socket for market data streaming and HistoryReadyNotification
createAndBind(
Channel.INGESTOR_RESPONSE,
SocketType.ROUTER,
bindAddress + ":" + config.getIngestorResponsePort(),
"Ingestor Response (ROUTER)"
);
// 3. Ingestor Control - PUB socket for broadcast control messages
createAndBind(
Channel.INGESTOR_CONTROL,
SocketType.PUB,
bindAddress + ":" + config.getIngestorControlPort(),
"Ingestor Control (PUB)"
);
// 4. Market Data Publication - PUB socket for market data streaming
createAndBind( createAndBind(
Channel.MARKET_DATA_PUB, Channel.MARKET_DATA_PUB,
SocketType.PUB, SocketType.PUB,
@@ -77,22 +57,6 @@ public class ZmqChannelManager implements Closeable {
"Market Data Publication (PUB)" "Market Data Publication (PUB)"
); );
// 5. Client Request - REP socket for request-response
createAndBind(
Channel.CLIENT_REQUEST,
SocketType.REP,
bindAddress + ":" + config.getClientRequestPort(),
"Client Request (REP)"
);
// 6. CEP Webhook - ROUTER socket for async callbacks
createAndBind(
Channel.CEP_WEBHOOK,
SocketType.ROUTER,
bindAddress + ":" + config.getCepWebhookPort(),
"CEP Webhook (ROUTER)"
);
LOG.info("All ZeroMQ channels initialized successfully"); LOG.info("All ZeroMQ channels initialized successfully");
} }
@@ -198,83 +162,6 @@ public class ZmqChannelManager implements Closeable {
return true; return true;
} }
/**
* Receive a message from a ROUTER socket.
* Returns a ReceivedMessage containing the identity, version, type, and payload.
*
* @param channel The channel to receive from (must be ROUTER)
* @param timeout Timeout in milliseconds (0 for non-blocking, -1 for blocking)
* @return ReceivedMessage or null if no message available
*/
public ReceivedMessage receiveRouterMessage(Channel channel, int timeout) {
ZMQ.Socket socket = getSocket(channel);
// Set receive timeout
if (timeout >= 0) {
socket.setReceiveTimeOut(timeout);
}
// Receive identity frame
byte[] identity = socket.recv(0);
if (identity == null) {
return null;
}
// Receive version frame
byte[] versionFrame = socket.recv(0);
if (versionFrame == null || versionFrame.length != 1) {
LOG.error("Invalid version frame received on channel {}", channel);
return null;
}
// Receive message frame (type byte + protobuf data)
byte[] messageFrame = socket.recv(0);
if (messageFrame == null || messageFrame.length < 1) {
LOG.error("Invalid message frame received on channel {}", channel);
return null;
}
byte versionByte = versionFrame[0];
byte messageTypeByte = messageFrame[0];
byte[] protobufData = new byte[messageFrame.length - 1];
System.arraycopy(messageFrame, 1, protobufData, 0, protobufData.length);
return new ReceivedMessage(identity, versionByte, messageTypeByte, protobufData);
}
/**
* Represents a received message from a ROUTER socket.
*/
public static class ReceivedMessage {
private final byte[] identity;
private final byte version;
private final byte messageType;
private final byte[] protobufData;
public ReceivedMessage(byte[] identity, byte version, byte messageType, byte[] protobufData) {
this.identity = identity;
this.version = version;
this.messageType = messageType;
this.protobufData = protobufData;
}
public byte[] getIdentity() {
return identity;
}
public byte getVersion() {
return version;
}
public byte getMessageType() {
return messageType;
}
public byte[] getProtobufData() {
return protobufData;
}
}
@Override @Override
public void close() { public void close() {
LOG.info("Closing ZeroMQ channels"); LOG.info("Closing ZeroMQ channels");

View File

@@ -9,10 +9,9 @@ market_data_pub_port: 5558 # XPUB - Market data fanout to clients
# Ingestor-facing ports # Ingestor-facing ports
ingestor_work_port: 5555 # PUB - Distribute work with exchange prefix ingestor_work_port: 5555 # PUB - Distribute work with exchange prefix
ingestor_response_port: 5556 # ROUTER - Receive responses from ingestors
# Flink connection # Flink connection
flink_market_data_endpoint: "tcp://flink-jobmanager:5557" # XSUB - Subscribe to Flink market data flink_market_data_endpoint: "tcp://flink-jobmanager:5558" # XSUB - Subscribe to Flink MARKET_DATA_PUB
# Timeouts and limits # Timeouts and limits
request_timeout_secs: 30 # Timeout for pending client requests request_timeout_secs: 30 # Timeout for pending client requests

View File

@@ -20,10 +20,6 @@ pub struct Config {
#[serde(default = "default_ingestor_work_port")] #[serde(default = "default_ingestor_work_port")]
pub ingestor_work_port: u16, pub ingestor_work_port: u16,
/// Ingestor response port (ROUTER - receives responses from ingestors)
#[serde(default = "default_ingestor_response_port")]
pub ingestor_response_port: u16,
/// Flink market data endpoint (XSUB - relay subscribes to Flink) /// Flink market data endpoint (XSUB - relay subscribes to Flink)
#[serde(default = "default_flink_market_data_endpoint")] #[serde(default = "default_flink_market_data_endpoint")]
pub flink_market_data_endpoint: String, pub flink_market_data_endpoint: String,
@@ -53,12 +49,8 @@ fn default_ingestor_work_port() -> u16 {
5555 5555
} }
fn default_ingestor_response_port() -> u16 {
5556
}
fn default_flink_market_data_endpoint() -> String { fn default_flink_market_data_endpoint() -> String {
"tcp://flink-jobmanager:5557".to_string() "tcp://flink-jobmanager:5558".to_string()
} }
fn default_request_timeout_secs() -> u64 { fn default_request_timeout_secs() -> u64 {
@@ -76,7 +68,6 @@ impl Default for Config {
client_request_port: default_client_request_port(), client_request_port: default_client_request_port(),
market_data_pub_port: default_market_data_pub_port(), market_data_pub_port: default_market_data_pub_port(),
ingestor_work_port: default_ingestor_work_port(), ingestor_work_port: default_ingestor_work_port(),
ingestor_response_port: default_ingestor_response_port(),
flink_market_data_endpoint: default_flink_market_data_endpoint(), flink_market_data_endpoint: default_flink_market_data_endpoint(),
request_timeout_secs: default_request_timeout_secs(), request_timeout_secs: default_request_timeout_secs(),
high_water_mark: default_hwm(), high_water_mark: default_hwm(),