ZeroMQ: HWM on PUSH does not work
Asked Answered
M

2

5

I am trying to write a server/client script with a server that vents the tasks, and multiple workers that execute it. The problem is that my ventilator has so many tasks that it would fill up the memory in a heartbeat. I tried to set the HWM before it binds, but with no success. It just keeps on sending messages as soon as a worker connects, completely disregarding the HWM that was set. I also have a sink that keeps record of the tasks that were done.

server.py

import zmq

def ventilate():
    context = zmq.Context()

    # Socket to send messages on
    sender = context.socket(zmq.PUSH)
    sender.setsockopt(zmq.SNDHWM, 30) #Big messages, so I don't want to keep too many in queue
    sender.bind("tcp://*:5557")


    # Socket with direct access to the sink: used to syncronize start of batch
    sink = context.socket(zmq.PUSH)
    sink.connect("tcp://localhost:5558")

    print "Sending tasks to workers…"

    # The first message is "0" and signals start of batch
    sink.send('0')
    print "Sent starting signal"

    while True:
        sender.send("Message")



if __name__=="__main__":
    ventilate()

worker.py

import zmq
from multiprocessing import Process

def work():
    context = zmq.Context()

    # Socket to receive messages on
    receiver = context.socket(zmq.PULL)
    receiver.connect("tcp://localhost:5557")

    # Socket to send messages to
    sender = context.socket(zmq.PUSH)
    sender.connect("tcp://localhost:5558")

    # Process t asks forever
    while True:
        msg = receiver.recv_msg()
        print "Doing sth with msg %s"%(msg)     
        sender.send("Message %s done"%(msg))

if __name__ == "__main__":
    for worker in range(10):        
        Process(target=work).start()

sink.py

import zmq

def sink():
    context = zmq.Context()

    # Socket to receive messages on
    receiver = context.socket(zmq.PULL)
    receiver.bind("tcp://*:5558")

    # Wait for start of batch
    s = receiver.recv()
    print "Received start signal"
    while True:
        msg = receiver.recv_msg()
        print msg


if __name__=="__main__":
    sink()
Metaphysics answered 15/1, 2014 at 9:29 Comment(4)
I'll try and reproduce your issue. Could you tell me which version of PyZMQ and ZMQ you are using? Please run zmq.zmq_version() and zmq.__version__Wolfgram
ZMQ version is 4.0.3 and pyzmq 13.1.0Metaphysics
-Eh, that's an annoying combination. Would you be able to update to pyzmq 14.0.1 and test with that (I don't mind what zmq version you use, just let me know). I'm on pyzmq 13.1.0 with zmq 3.x.x on windows and it's a pain to change the zmq version without updating to pyzmq v14 but I want to make sure you still see the issue with that version before I try and reproduceWolfgram
Tested with v14, same issue.Metaphysics
W
5

Ok, I had a play around, I don't think the issue is with the PUSH HWM, but rather that you can't set a HWM for PULL. If you look at this documentation, you can see there it says N/A for action on HWM.

The PULL sockets seem to be taking hundreds of messages each (and I did try setting a HWM just in case it did anything on the PULL socket. It didn't.). I evidenced this by changing the ventilator to send messages with an incrementing integer, and changing each worker in the pool to wait 2 seconds between calls to recv(). The workers print out that they are processing messages with vastly different integers. For instance, one worker will be working on message 10, while the next is working on message 400. As time goes on, you see the worker who was processing message 10, is now processing message 11, 12, 13, etc. while the other is processing 401, 402, etc.

This indicates to me that the ZMQ_PULL socket is buffering the messages somewhere. So while the ZMQ_PUSH socket does have a HWM, the PULL socket is requesting messages quickly, despite them not actually being accessed by a call to recv(). So that results in the PUSH HWM effectively being ignored if a PULL socket is connected. As far as I can see, you can't control the length of the buffer of the PULL socket (I would expect the RCVHWM socket option to control this but it doesn't appear to).

This behaviour of course begs the question what is the point of the ZMQ_PULL HWM option, which only makes sense to have if you can also control the receiving sockets HWM.

At this point, I'd start asking the 0MQ people whether you are missing something obvious, or if this is considered a bug.

Sorry I couldn't be more help!

Wolfgram answered 17/1, 2014 at 0:46 Comment(1)
Thanks a lot for the effort you've made so far. I did find out that setting setsockopt(zmq.RCVBUF, 2) would in fact slow things down. By default it is set at 0, which means that it takes the operating system's default buffer size. No idea what it is. It still doesn't exactly do what I want, but it comes closer.Metaphysics
M
1

ZeroMQ has buffers on both sending and receiving ends of a socket, hence you need to set high water marks on both the PUSH and the PULL socket in your code (and indeed before a bind() or connect()).

In the Python bindings this is now conveniently done via socket.hwm = 1 which will set both ZMQ_SNDHWM and ZMQ_RCVHWM in one go.

Monostylous answered 1/8, 2014 at 23:21 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.