From ba6bd5e0c2184df89c5fadc1638f22cb4fad08a7 Mon Sep 17 00:00:00 2001 From: Tim Olson Date: Fri, 17 Apr 2026 17:35:37 -0400 Subject: [PATCH] bugfixes --- .../dexorder/flink/zmq/ZmqChannelManager.java | 22 +++++++++++++------ ingestor/src/zmq-client.js | 8 +++++++ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/flink/src/main/java/com/dexorder/flink/zmq/ZmqChannelManager.java b/flink/src/main/java/com/dexorder/flink/zmq/ZmqChannelManager.java index a1e879e3..3db705f8 100644 --- a/flink/src/main/java/com/dexorder/flink/zmq/ZmqChannelManager.java +++ b/flink/src/main/java/com/dexorder/flink/zmq/ZmqChannelManager.java @@ -6,6 +6,7 @@ import org.slf4j.LoggerFactory; import org.zeromq.SocketType; import org.zeromq.ZContext; import org.zeromq.ZMQ; +import org.zeromq.ZMQException; import java.io.Closeable; import java.util.HashMap; @@ -180,15 +181,22 @@ public class ZmqChannelManager implements Closeable { public boolean sendToWorker(byte[] identity, byte versionByte, byte messageTypeByte, byte[] protobufData) { ZMQ.Socket socket = getSocket(Channel.INGESTOR_BROKER); - if (!socket.send(identity, ZMQ.SNDMORE)) return false; - if (!socket.send(new byte[0], ZMQ.SNDMORE)) return false; - if (!socket.send(new byte[]{versionByte}, ZMQ.SNDMORE)) return false; + try { + if (!socket.send(identity, ZMQ.SNDMORE)) return false; + if (!socket.send(new byte[0], ZMQ.SNDMORE)) return false; + if (!socket.send(new byte[]{versionByte}, ZMQ.SNDMORE)) return false; - byte[] frame = new byte[1 + protobufData.length]; - frame[0] = messageTypeByte; - System.arraycopy(protobufData, 0, frame, 1, protobufData.length); + byte[] frame = new byte[1 + protobufData.length]; + frame[0] = messageTypeByte; + System.arraycopy(protobufData, 0, frame, 1, protobufData.length); - return socket.send(frame, 0); + return socket.send(frame, 0); + } catch (ZMQException e) { + // ROUTER_MANDATORY throws (not returns false) for unreachable peer identities. + // Returning false lets the caller detect and purge the stale slot. + LOG.debug("sendToWorker failed (errno={}): {}", e.getErrorCode(), e.getMessage()); + return false; + } } @Override diff --git a/ingestor/src/zmq-client.js b/ingestor/src/zmq-client.js index b589cb8b..3c860ed0 100644 --- a/ingestor/src/zmq-client.js +++ b/ingestor/src/zmq-client.js @@ -1,5 +1,6 @@ // ZeroMQ DEALER client connecting to Flink IngestorBroker (ROUTER, port 5567) import * as zmq from 'zeromq'; +import os from 'os'; import { DataRequest, WorkerReady, WorkComplete, WorkHeartbeat, WorkReject, WorkStop, @@ -44,6 +45,13 @@ export class ZmqClient { this.dealerSocket = new zmq.Dealer(); + // Set a stable routing ID so the ROUTER always recognises this peer, even + // after a TCP reconnect. Without this, ZMQ assigns a new random identity + // on every reconnect, leaving dead slots in the broker's queue → Errno 65. + const stableId = os.hostname(); + this.dealerSocket.routingId = stableId; + this.logger.info({ routingId: stableId }, 'Set stable DEALER routing ID'); + // Subscribe to connection events BEFORE calling connect() so we catch the // initial establishment. The 'connect' event fires on initial TCP handshake // and again after every ZMQ reconnect (e.g. Flink restart).