I am wondering how to shutdown JeroMQ properly, so far I know three methods that all have their pro and cons and I have no clue which one is the best.
The situation:
- Thread A: owns context, shall provide start/stop methods
- Thread B: actual listener thread
My current method:
Thread A
static ZContext CONTEXT = new ZContext();
Thread thread;
public void start() {
thread = new Thread(new B()).start();
}
public void stop() {
thread.stopping = true;
thread.join();
}
Thread B
boolean stopping = false;
ZMQ.Socket socket;
public void run() {
socket = CONTEXT.createSocket(ROUTER);
... // socket setup
socket.setReceiveTimeout(10);
while (!stopping) {
socket.recv();
}
if (NUM_SOCKETS >= 1) {
CONTEXT.destroySocket(socket);
} else {
CONTEXT.destroy();
}
}
This works just great. 10ms to shutdown is no problem for me, but I will unnecessarily increase the CPU load when there are no messages received. At the moment I prefer this one.
The second method shares the socket between the two threads:
Thread A
static ZContext CONTEXT = new ZContext();
ZMQ.Socket socket;
Thread thread;
public void start() {
socket = CONTEXT.createSocket(ROUTER);
... // socket setup
thread = new Thread(new B(socket)).start();
}
public void stop() {
thread.stopping = true;
CONTEXT.destroySocket(socket);
}
Thread B
boolean stopping = false;
ZMQ.Socket socket;
public void run() {
try {
while (!stopping) {
socket.recv();
}
} catch (ClosedSelection) {
// socket closed by A
socket = null;
}
if (socket != null) {
// close socket myself
if (NUM_SOCKETS >= 1) {
CONTEXT.destroySocket(socket);
} else {
CONTEXT.destroy();
}
}
}
Works like a charm, too, but even if recv
is already blocking the exception does not get thrown sometimes. If I wait one millisecond after I started thread A the exception is always thrown. I don't know if this is a bug or just an effect of my misuse, as I share the socket.
"revite" asked this question before (https://github.com/zeromq/jeromq/issues/116) and got an answer which is the third solution: https://github.com/zeromq/jeromq/blob/master/src/test/java/guide/interrupt.java
Summary:
They call ctx.term()
and interrupt the thread blocking in socket.recv()
.
This works fine, but I do not want to terminate my whole context, but just this single socket. I would have to use one context per socket, so I were not able to use inproc.
Summary
At the moment I have no clue how to get thread B out of its blocking state other than using timeouts, share the socket or terminate the whole context.
What is the correct way of doing this?