JeroMQ shutdown correctly
Asked Answered
D

3

9

I am wondering how to shutdown JeroMQ properly, so far I know three methods that all have their pro and cons and I have no clue which one is the best.

The situation:

  • Thread A: owns context, shall provide start/stop methods
  • Thread B: actual listener thread

My current method:

Thread A

static ZContext CONTEXT = new ZContext();
Thread thread;

public void start() {
    thread = new Thread(new B()).start();
}

public void stop() {
    thread.stopping = true;
    thread.join();
}

Thread B

boolean stopping = false;
ZMQ.Socket socket;

public void run() {
    socket = CONTEXT.createSocket(ROUTER);
    ... // socket setup
    socket.setReceiveTimeout(10);

    while (!stopping) {
        socket.recv();
    }

    if (NUM_SOCKETS >= 1) {
        CONTEXT.destroySocket(socket);
    } else {
        CONTEXT.destroy();
    }
}

This works just great. 10ms to shutdown is no problem for me, but I will unnecessarily increase the CPU load when there are no messages received. At the moment I prefer this one.


The second method shares the socket between the two threads:

Thread A

static ZContext CONTEXT = new ZContext();
ZMQ.Socket socket;
Thread thread;

public void start() {
    socket = CONTEXT.createSocket(ROUTER);
    ... // socket setup
    thread = new Thread(new B(socket)).start();
}

public void stop() {
    thread.stopping = true;
    CONTEXT.destroySocket(socket);
}

Thread B

boolean stopping = false;
ZMQ.Socket socket;

public void run() {
    try {
        while (!stopping) {
            socket.recv();
        }
    } catch (ClosedSelection) {
        // socket closed by A
        socket = null;
    }
    if (socket != null) {
        // close socket myself
        if (NUM_SOCKETS >= 1) {
            CONTEXT.destroySocket(socket);
        } else {
            CONTEXT.destroy();
        }
    }
}

Works like a charm, too, but even if recv is already blocking the exception does not get thrown sometimes. If I wait one millisecond after I started thread A the exception is always thrown. I don't know if this is a bug or just an effect of my misuse, as I share the socket.


"revite" asked this question before (https://github.com/zeromq/jeromq/issues/116) and got an answer which is the third solution: https://github.com/zeromq/jeromq/blob/master/src/test/java/guide/interrupt.java

Summary: They call ctx.term() and interrupt the thread blocking in socket.recv().

This works fine, but I do not want to terminate my whole context, but just this single socket. I would have to use one context per socket, so I were not able to use inproc.

Summary

At the moment I have no clue how to get thread B out of its blocking state other than using timeouts, share the socket or terminate the whole context.

What is the correct way of doing this?

Driftwood answered 27/3, 2014 at 11:46 Comment(1)
Here is a different solution, create a special thread that cannot be interrupted (extend it), but has a special method to inform the run routine about requested shutdown, call this method from shutdown hook...Freaky
R
5

It is often mentioned that you can just destroy the zmq context and anything sharing that context will exit, however this creates a nightmare because your exiting code has to do its best in avoiding a minefield of accidentally calling into dead socket objects.

Attempting to close the socket doesn't work either because they are not thread safe and you'll end up with crashes.

ANSWER: The best way is to do as the ZeroMQ guide suggests for any use via multiple threads; use zmq sockets and not thread mutexes/locks/etc. Set up an additional listener socket that you'll connect&send something to on shutdown, and your run() should used a JeroMQ Poller to check which of your two sockets receive anything - if the additional socket receives something then exit.

Righthander answered 1/12, 2014 at 23:25 Comment(0)
F
3

Old question, but just in case...

I'd recommend checking out ZThread source. You should be able to create an instance of IAttachedRunnable that you can pass to the fork method, and the run method of your instance will be passed a PAIR socket and execute in another thread, while the fork will return the connected PAIR socket to use for communicating with the PAIR socket that your IAttachedRunnable got.

Faqir answered 19/7, 2014 at 12:12 Comment(0)
S
2

Check out the jeromq source here, even when you're doing a "blocking" recv, you're still burning CPU the entire time (the thread never sleeps). If you're worried about that, have the second thread sleep between polling and let the parent thread interrupt. Something like (just the relevant portions):

Thread A

public void stop() {
    thread.interrupt();
    thread.join();
}

Thread B

while (!Thread.interrupted()) {
  socket.recv(); // do whatever
  try {
    Thread.sleep(10); //milliseconds
  } catch (InterruptedException e) {
    break;
  }
}

Also, with regard to your second solution, in general you should not share sockets between threads - the zeromq guide is pretty clear on this - "Don't share ØMQ sockets between threads. ØMQ sockets are not threadsafe." Remember that a major use for ZMQ is IPC - threads communicating through connected sockets, not sharing the same end of one socket. No need for things like shared boolean stop variables.

Stubstad answered 30/4, 2014 at 1:41 Comment(1)
Shouldn't the socket.recv() be inside the try? That's the part that blocks.Bratcher

© 2022 - 2024 — McMap. All rights reserved.