Abort zeromq recv() or poll() from another thread - instantly and without the need to wait for timeout
Asked Answered
A

2

7

I'm using ZeroMQ in Python and C++ in many configurations and I wonder which is the most elegant way to abort a recv() or poll() from another thread (e.g. in case of controlled program termination but also if you want to stop listening without the need to kill the socket).

In contrast to this question I don't just want to avoid infinitive wait but I want to return immediately from recv() or poll().

I know I can just provide a timeout and abort recv() like this:

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

while _running:
    if poller.poll(timeout=100) == []:
        # maybe handle unwanted timout here..
        continue

    handle_message(socket.recv())

This will poll the socket endlessly until _running is set to False from another thread - after a maximum of 100 ms I'm done.

But this is not nice - I have a busy loop and it's hard this way to handle real timeouts which might be result of unwanted behavior. Also I have to wait for the timeout which is not critical in most cases but.. you know what I mean.

Of course I can poll an extra socket for abortion:

abort_socket = context.socket(zmq.SUB)
abort_socket.setsockopt(zmq.SUBSCRIBE, b"")
abort_socket.connect(<abort-publisher-endpoint>)

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
poller.register(abort_socket, zmq.POLLIN)

while _running:
    poll_result = poller.poll(timeout=1000)
    if socket in poll_result:
        handle_message(socket.recv())
    elif abort_socket in poll_result:
        break
    else:
        # handle real timeout here
        pass

But this approach also has disadvantages:

  • it's a bit verbose - at the place where I trigger the abort I would have to create a publisher and use that to abort the receiver
  • the abort_socket can only be used from one thread, so I would have to make this sure

So my question is: how is this done the nice way?

Can I somehow just use something like Python's threading.Event or s.th. similar in other languages rather than the abort-socket which can be passed to the poller like this?:

def listener_thread_fn(event)

    poller = zmq.Poller()
    poller.register(socket, zmq.POLLIN)
    poller.register(event, zmq.POLLIN)

    while _running:
        poll_result = poller.poll(timeout=1000)
        if socket in poll_result:
            handle_message(socket.recv())
        elif event in poll_result:
            break
        else:
            # handle real timeout here
            pass

So you just had to create a theading.Event() in the first place, pass it to listener_thread_fn and call event.set() from any thread to abort.

Aeolotropic answered 26/6, 2015 at 14:51 Comment(6)
possible duplicate of zeromq: how to prevent infinite wait? - check out the last answer in particular which mentions ZMQ_LINGER, that should get you where you're going.Latinize
Doesn't look like a duplicate to me.Benildis
That's really not what I was asking. I want to abort a blocking recv() or poll() - that doesn't mean there are any lingering messages around. I don't want to disconnect() the socket from another thread because it's discouraged to access a zeromq-socket from a different thread than the one which created it. Both using another socktet to send an abort-signal to poll() or the timeout-approach would accomplish this but they both don't seem very elegant to me.Aeolotropic
@Jason: the accepted answer in the question you linked demonstrates the timeout-method which not a nice way to abort because you have to wait until the timeout expired.Aeolotropic
I don't know it for Python, but the doc states that zmq_poll returns with EINTR on the delivery of a signal.Salyers
That's true for Python, too, as you can see in Peque's answer. I don't like signals for controlling program flow since you never know who else installed signal handler (which results in unpredictable behavior)Aeolotropic
C
1

With Python and pyzmq, an error is raised on recv() or poll() interruption; so you can simply catch the exception when it occurs. An example with recv():

while True:
    try:
        request = server.recv()
    except zmq.ZMQError as e:
        if e.errno == errno.EINTR:
            print('Clean exit now!')
            break
        raise

You can easily modify that code to use poll() instead (its the same procedure). Also, note that you'll need to:

import errno
Custumal answered 19/7, 2015 at 13:28 Comment(7)
The problem is that you can't send a signal to a thread - only to the whole process. In case you don't want to interrupt the whole process but only one poll it's difficult to make sure that nothing else happens when you send a signal. (also - when it comes to platform independence I would have a bad feeling).Aeolotropic
@frans: Sorry, I misunderstood your problem. I think if you removed the few references you have in this question about threads (the title and most paragraphs and first two code chunks could be unchanged), this question would still be very useful and will already have an answer. Then create a new question where you state your problem more precisely: i.e. note that you are using threads in the title, show a code chunk that uses threads and explain what you want to achieve (i.e.: from this process that has 3 threads, I would like to interrupt just one and make it break from the ZMQ polling).Custumal
@frans: if you don't like this, then I can delete my answer from here, but I would ask you to edit this question so that the fact that you need threads is more clear and to show what is each thread doing and how are you expecting to stop any of them. Will the threads be removed after the polling is aborted? etc. I think it's better to create a new one, as this one is a good question that might affect other users that don't use threads in their code. :-)Custumal
I think you can leave your answer since it at least shows one way to abort a poll() or recv() without the timeout as requested in the title (this is why I +1ed your answer) But I'd like to be able to be able to keep the process running, so I will just be more precise.Aeolotropic
@frans: I think my answer no longer makes sense in here now that the question has changed. I will probably delete it soon. If you want to create a new question to ask how to "Abort zeromq recv() or poll() instantly" (without threads), I'll be happy to answer in there if you share the link. ;-)Custumal
since you can send signals from another thread to the current process your answer still describes one way to do it. I think you should leave the answer - at least for completeness.Aeolotropic
@frans: ok. If you don't want to use signals, I would recommend you to use ZeroMQ patterns (i.e. add one socket to the poll). Although notice that this way you'll "interrupt" the poll or recv but you wont interrupt if its processing the message received (you'll need to wait until you get back to poll/recv again). If you are using threads within a process, take advantage of ZeroMQ's inproc transport (if you are not already doing so). It is amazingly fast. :-)Custumal
F
0

This question is old but still relevant and without a real answer as far as I can tell.

My solution (though more of a workaround) is to indeed just close the socket from outside the thread to instantly abort a polller.poll(), socket.recv(), etc.
This will however raise an exception in the ZMQ thread: ZMQError("not a socket", errno=128), which I decided to just catch and let pass. This directly conflicts with ZMQ's warning to only use a socket from a single thread, but I simply cannot find another method to cancel a query instantly.

The code looks like:


class ZMQClient:

    # ...

    def thread_loop(self):
        # Called in a dedicated thread somewhere
        try:
            updated_sockets = dict(self._poller.poll(timeout=100))
        except ZMQError as err:
            if str(err) != "not a socket":
                raise err  # Don't show exception if context was killed
        else:
            if self.socket in updated_sockets:
                msg = self.socket.recv()
                # ...

    # ...

    def stop(self):
        self.socket.close()

I am combining this with a PySide6 QT app, where I stop the loop from a closeEvent():

class MyMainWindow(QMainWindow):

    def __init__():
        # Start ZMQ thread
        # self.client = ...
        # ...

    # ...

    def closeEvent(self, event):
        self.client.stop()
Fetich answered 17/9 at 13:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.