zeromq: how to prevent infinite wait?
Asked Answered
M

4

83

I just got started with ZMQ. I am designing an app whose workflow is:

  1. one of many clients (who have random PULL addresses) PUSH a request to a server at 5555
  2. the server is forever waiting for client PUSHes. When one comes, a worker process is spawned for that particular request. Yes, worker processes can exist concurrently.
  3. When that process completes it's task, it PUSHes the result to the client.

I assume that the PUSH/PULL architecture is suited for this. Please correct me on this.


But how do I handle these scenarios?

  1. the client_receiver.recv() will wait for an infinite time when server fails to respond.
  2. the client may send request, but it will fail immediately after, hence a worker process will remain stuck at server_sender.send() forever.

So how do I setup something like a timeout in the PUSH/PULL model?


EDIT: Thanks user938949's suggestions, I got a working answer and I am sharing it for posterity.

Miter answered 24/9, 2011 at 12:25 Comment(2)
I'm not a 0mq expert, but in a lot of situations like this it is better to have your worker pool created at start-up rather than creating workers in response to messages. Maybe I'm misunderstanding you.Silverware
Good point. I actually plan to pre-fork the workers. I just realized that it can be trivial with 0mq.Miter
I
88

If you are using zeromq >= 3.0, then you can set the RCVTIMEO socket option:

client_receiver.RCVTIMEO = 1000 # in milliseconds

But in general, you can use pollers:

poller = zmq.Poller()
poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send

And poller.poll() takes a timeout:

evts = poller.poll(1000) # wait *up to* one second for a message to arrive.

evts will be an empty list if there is nothing to receive.

You can poll with zmq.POLLOUT, to check if a send will succeed.

Or, to handle the case of a peer that might have failed, a:

worker.send(msg, zmq.NOBLOCK)

might suffice, which will always return immediately - raising a ZMQError(zmq.EAGAIN) if the send could not complete.

Immune answered 24/9, 2011 at 16:33 Comment(6)
could you elaborate on zmq.NOBLOCK?Miter
Hi, do we have to re register a socket (in a poller) every time we disconnect and re connect it?Lights
No, only if you close the socket and open a new one do you need to re-register.Immune
As @Marder and @Ite said below: if using zmq.RCVTIMEO, you also need to set zmq.LINGER or else the socket still won't close even after timeout. In Python, it's socket.setsockopt(zmq.RCVTIMEO, 1000) socket.setsockopt(zmq.LINGER, 0) message = socket.recv()Lubet
Both lines work in python: results_receiver.RCVTIMEO = 1000 and results_receiver.setsockopt(zmq.RCVTIMEO, 1000)Pedantry
You also have to raise the error zmq.ZMQError if you are using a try: except: schema.Pedantry
M
19

This was a quick hack I made after I referred user938949's answer and http://taotetek.wordpress.com/2011/02/02/python-multiprocessing-with-zeromq/ . If you do better, please post your answer, I will recommend your answer.

For those wanting lasting solutions on reliability, refer http://zguide.zeromq.org/page:all#toc64

Version 3.0 of zeromq (beta ATM) supports timeout in ZMQ_RCVTIMEO and ZMQ_SNDTIMEO. http://api.zeromq.org/3-0:zmq-setsockopt

Server

The zmq.NOBLOCK ensures that when a client does not exist, the send() does not block.

import time
import zmq
context = zmq.Context()

ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")

i=0

while True:
    i=i+1
    time.sleep(0.5)
    print ">>sending message ",i
    try:
        ventilator_send.send(repr(i),zmq.NOBLOCK)
        print "  succeed"
    except:
        print "  failed"

Client

The poller object can listen in on many recieving sockets (see the "Python Multiprocessing with ZeroMQ" linked above. I linked it only on work_receiver. In the infinite loop, the client polls with an interval of 1000ms. The socks object returns empty if no message has been recieved in that time.

import time
import zmq
context = zmq.Context()

work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")

poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)

# Loop and accept messages from both channels, acting accordingly
while True:
    socks = dict(poller.poll(1000))
    if socks:
        if socks.get(work_receiver) == zmq.POLLIN:
            print "got message ",work_receiver.recv(zmq.NOBLOCK)
    else:
        print "error: message timeout"
Miter answered 26/9, 2011 at 8:47 Comment(1)
Does Python have select? Ruby's library has one like IO.select. You can: if ZMQ.select([read_socket], nil, nil, 20); puts read_socket.recv; else; puts 'timeout 20 secs'; endCrippen
L
14

If you're only waiting for one socket, rather than create a Poller, you can do this:

if work_receiver.poll(1000, zmq.POLLIN):
    print "got message ",work_receiver.recv(zmq.NOBLOCK)
else:
    print "error: message timeout"

You can use this if your timeout changes depending on the situation, instead of setting work_receiver.RCVTIMEO.

Lemuelah answered 2/5, 2019 at 13:41 Comment(1)
I like this answer the best for simple cases where just one socket has to time out - no poller to create and no need to register the socket.Overmodest
M
11

The send wont block if you use ZMQ_NOBLOCK, but if you try closing the socket and context, this step would block the program from exiting..

The reason is that the socket waits for any peer so that the outgoing messages are ensured to get queued.. To close the socket immediately and flush the outgoing messages from the buffer, use ZMQ_LINGER and set it to 0..

Marder answered 1/6, 2012 at 7:50 Comment(1)
zmq.RCVTIMEO won't help you if you don't use zmq.LINGER because after the timeout the socket still won't close. This should be added to the chosen answer.Ite

© 2022 - 2024 — McMap. All rights reserved.