use crate::config::Config; use crate::proto; use anyhow::{Context, Result}; use prost::Message; use tracing::{debug, error, info, warn}; const PROTOCOL_VERSION: u8 = 0x01; const MSG_TYPE_SUBMIT_REQUEST: u8 = 0x10; const MSG_TYPE_SUBMIT_RESPONSE: u8 = 0x11; pub struct Relay { config: Config, context: zmq::Context, } impl Relay { pub fn new(config: Config) -> Result { let context = zmq::Context::new(); Ok(Self { config, context, }) } pub async fn run(self) -> Result<()> { info!("Initializing ZMQ Relay"); let client_request_socket = self.create_client_request_socket()?; let market_data_frontend = self.create_market_data_frontend()?; let market_data_backend = self.create_market_data_backend()?; let flink_request_socket = self.create_flink_request_socket()?; info!("All sockets initialized — relay forwards requests to Flink"); tokio::task::spawn_blocking(move || { Self::proxy_loop( client_request_socket, market_data_frontend, market_data_backend, flink_request_socket, ) }) .await? } fn create_client_request_socket(&self) -> Result { let socket = self.context.socket(zmq::ROUTER)?; socket.set_sndhwm(self.config.high_water_mark)?; socket.set_rcvhwm(self.config.high_water_mark)?; socket.set_linger(1000)?; let endpoint = format!("{}:{}", self.config.bind_address, self.config.client_request_port); socket.bind(&endpoint)?; info!("Client request socket (ROUTER) bound to {}", endpoint); Ok(socket) } fn create_market_data_frontend(&self) -> Result { let socket = self.context.socket(zmq::XPUB)?; socket.set_sndhwm(self.config.high_water_mark)?; socket.set_xpub_verbose(true)?; let endpoint = format!("{}:{}", self.config.bind_address, self.config.market_data_pub_port); socket.bind(&endpoint)?; info!("Market data frontend (XPUB) bound to {}", endpoint); info!(" → Clients subscribe here; subscription events forwarded to Flink for realtime activation"); Ok(socket) } fn create_market_data_backend(&self) -> Result { let socket = self.context.socket(zmq::XSUB)?; socket.set_rcvhwm(self.config.high_water_mark)?; socket.connect(&self.config.flink_market_data_endpoint)?; info!("Market data backend (XSUB) connected to {}", self.config.flink_market_data_endpoint); info!(" → Receives market data and notifications from Flink"); Ok(socket) } fn create_flink_request_socket(&self) -> Result { let socket = self.context.socket(zmq::PUSH)?; socket.set_sndhwm(self.config.high_water_mark)?; socket.set_linger(1000)?; socket.connect(&self.config.flink_request_endpoint)?; info!("Flink request socket (PUSH) connected to {}", self.config.flink_request_endpoint); info!(" → Forwards SubmitHistoricalRequest to Flink for dispatch to ingestors"); Ok(socket) } fn proxy_loop( client_request_socket: zmq::Socket, market_data_frontend: zmq::Socket, market_data_backend: zmq::Socket, flink_request_socket: zmq::Socket, ) -> Result<()> { let mut items = [ client_request_socket.as_poll_item(zmq::POLLIN), market_data_frontend.as_poll_item(zmq::POLLIN), market_data_backend.as_poll_item(zmq::POLLIN), ]; info!("Entering relay proxy loop"); loop { zmq::poll(&mut items, 100) .context("Failed to poll sockets")?; // Handle client request submissions if items[0].is_readable() { if let Err(e) = Self::handle_client_submission( &client_request_socket, &flink_request_socket, ) { error!("Error handling client submission: {}", e); } } // Proxy client subscription events → Flink (XPUB → XSUB) if items[1].is_readable() { if let Err(e) = Self::proxy_subscription(&market_data_frontend, &market_data_backend) { error!("Error proxying subscription: {}", e); } } // Proxy market data from Flink → clients (XSUB → XPUB) if items[2].is_readable() { if let Err(e) = Self::proxy_market_data(&market_data_backend, &market_data_frontend) { error!("Error proxying market data: {}", e); } } } } fn handle_client_submission( client_socket: &zmq::Socket, flink_socket: &zmq::Socket, ) -> Result<()> { // Receive from client: [identity][empty][version][message] let identity = client_socket.recv_bytes(0)?; let _empty = client_socket.recv_bytes(0)?; let version_frame = client_socket.recv_bytes(0)?; let message_frame = client_socket.recv_bytes(0)?; if version_frame.len() != 1 || version_frame[0] != PROTOCOL_VERSION { warn!("Invalid protocol version from client"); return Ok(()); } if message_frame.is_empty() { warn!("Empty message frame from client"); return Ok(()); } let msg_type = message_frame[0]; let payload = &message_frame[1..]; debug!("Received client submission: type=0x{:02x}, payload_len={}", msg_type, payload.len()); match msg_type { MSG_TYPE_SUBMIT_REQUEST => { Self::handle_submit_request( identity, payload, client_socket, flink_socket, )?; } _ => { warn!("Unknown message type from client: 0x{:02x}", msg_type); } } Ok(()) } fn handle_submit_request( client_identity: Vec, payload: &[u8], client_socket: &zmq::Socket, flink_socket: &zmq::Socket, ) -> Result<()> { // Parse just enough to build the SubmitResponse — relay stays thin let request = proto::SubmitHistoricalRequest::decode(payload) .context("Failed to parse SubmitHistoricalRequest")?; let request_id = request.request_id.clone(); let client_id = request.client_id.clone(); info!("Forwarding request to Flink: request_id={}, ticker={}", request_id, request.ticker); // Forward the raw request to Flink via PUSH // Flink builds DataRequest and dispatches to ingestors via IngestorBroker let version_frame = vec![PROTOCOL_VERSION]; let mut message_frame = vec![MSG_TYPE_SUBMIT_REQUEST]; message_frame.extend_from_slice(payload); flink_socket.send(&version_frame, zmq::SNDMORE)?; flink_socket.send(&message_frame, 0)?; // Build SubmitResponse — relay still acks the client immediately let notification_topic = if let Some(cid) = &client_id { format!("RESPONSE:{}", cid) } else { format!("HISTORY_READY:{}", request_id) }; let response = proto::SubmitResponse { request_id: request_id.clone(), status: proto::submit_response::SubmitStatus::Queued as i32, error_message: None, notification_topic: notification_topic.clone(), }; let mut response_bytes = Vec::new(); response.encode(&mut response_bytes)?; let version_frame = vec![PROTOCOL_VERSION]; let mut resp_message_frame = vec![MSG_TYPE_SUBMIT_RESPONSE]; resp_message_frame.extend_from_slice(&response_bytes); client_socket.send(&client_identity, zmq::SNDMORE)?; client_socket.send(&[] as &[u8], zmq::SNDMORE)?; client_socket.send(&version_frame, zmq::SNDMORE)?; client_socket.send(&resp_message_frame, 0)?; info!("Acked client and forwarded to Flink: request_id={}, notification_topic={}", request_id, notification_topic); Ok(()) } fn proxy_subscription( frontend: &zmq::Socket, backend: &zmq::Socket, ) -> Result<()> { // Forward subscription event from XPUB to XSUB so Flink can detect realtime interest let msg = frontend.recv_bytes(0)?; backend.send(&msg, 0)?; if !msg.is_empty() { let action = if msg[0] == 1 { "subscribe" } else { "unsubscribe" }; let topic = String::from_utf8_lossy(&msg[1..]); debug!("Client {} to topic: {}", action, topic); } Ok(()) } fn proxy_market_data( backend: &zmq::Socket, frontend: &zmq::Socket, ) -> Result<()> { // Zero-copy proxy: XSUB (Flink) → XPUB (clients) loop { let msg = backend.recv_bytes(0)?; let more = backend.get_rcvmore()?; if more { frontend.send(&msg, zmq::SNDMORE)?; } else { frontend.send(&msg, 0)?; break; } } Ok(()) } }