bugfixes
This commit is contained in:
@@ -6,6 +6,7 @@ import org.slf4j.LoggerFactory;
|
|||||||
import org.zeromq.SocketType;
|
import org.zeromq.SocketType;
|
||||||
import org.zeromq.ZContext;
|
import org.zeromq.ZContext;
|
||||||
import org.zeromq.ZMQ;
|
import org.zeromq.ZMQ;
|
||||||
|
import org.zeromq.ZMQException;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
@@ -180,6 +181,7 @@ public class ZmqChannelManager implements Closeable {
|
|||||||
public boolean sendToWorker(byte[] identity, byte versionByte, byte messageTypeByte, byte[] protobufData) {
|
public boolean sendToWorker(byte[] identity, byte versionByte, byte messageTypeByte, byte[] protobufData) {
|
||||||
ZMQ.Socket socket = getSocket(Channel.INGESTOR_BROKER);
|
ZMQ.Socket socket = getSocket(Channel.INGESTOR_BROKER);
|
||||||
|
|
||||||
|
try {
|
||||||
if (!socket.send(identity, ZMQ.SNDMORE)) return false;
|
if (!socket.send(identity, ZMQ.SNDMORE)) return false;
|
||||||
if (!socket.send(new byte[0], ZMQ.SNDMORE)) return false;
|
if (!socket.send(new byte[0], ZMQ.SNDMORE)) return false;
|
||||||
if (!socket.send(new byte[]{versionByte}, ZMQ.SNDMORE)) return false;
|
if (!socket.send(new byte[]{versionByte}, ZMQ.SNDMORE)) return false;
|
||||||
@@ -189,6 +191,12 @@ public class ZmqChannelManager implements Closeable {
|
|||||||
System.arraycopy(protobufData, 0, frame, 1, protobufData.length);
|
System.arraycopy(protobufData, 0, frame, 1, protobufData.length);
|
||||||
|
|
||||||
return socket.send(frame, 0);
|
return socket.send(frame, 0);
|
||||||
|
} catch (ZMQException e) {
|
||||||
|
// ROUTER_MANDATORY throws (not returns false) for unreachable peer identities.
|
||||||
|
// Returning false lets the caller detect and purge the stale slot.
|
||||||
|
LOG.debug("sendToWorker failed (errno={}): {}", e.getErrorCode(), e.getMessage());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
// ZeroMQ DEALER client connecting to Flink IngestorBroker (ROUTER, port 5567)
|
// ZeroMQ DEALER client connecting to Flink IngestorBroker (ROUTER, port 5567)
|
||||||
import * as zmq from 'zeromq';
|
import * as zmq from 'zeromq';
|
||||||
|
import os from 'os';
|
||||||
import {
|
import {
|
||||||
DataRequest,
|
DataRequest,
|
||||||
WorkerReady, WorkComplete, WorkHeartbeat, WorkReject, WorkStop,
|
WorkerReady, WorkComplete, WorkHeartbeat, WorkReject, WorkStop,
|
||||||
@@ -44,6 +45,13 @@ export class ZmqClient {
|
|||||||
|
|
||||||
this.dealerSocket = new zmq.Dealer();
|
this.dealerSocket = new zmq.Dealer();
|
||||||
|
|
||||||
|
// Set a stable routing ID so the ROUTER always recognises this peer, even
|
||||||
|
// after a TCP reconnect. Without this, ZMQ assigns a new random identity
|
||||||
|
// on every reconnect, leaving dead slots in the broker's queue → Errno 65.
|
||||||
|
const stableId = os.hostname();
|
||||||
|
this.dealerSocket.routingId = stableId;
|
||||||
|
this.logger.info({ routingId: stableId }, 'Set stable DEALER routing ID');
|
||||||
|
|
||||||
// Subscribe to connection events BEFORE calling connect() so we catch the
|
// Subscribe to connection events BEFORE calling connect() so we catch the
|
||||||
// initial establishment. The 'connect' event fires on initial TCP handshake
|
// initial establishment. The 'connect' event fires on initial TCP handshake
|
||||||
// and again after every ZMQ reconnect (e.g. Flink restart).
|
// and again after every ZMQ reconnect (e.g. Flink restart).
|
||||||
|
|||||||
Reference in New Issue
Block a user