ZeroMQ High Water Mark Doesn't Work
Asked Answered
M

2

11

when I read the "Durable Subscribers and High-Water Marks" in zmq guide, it said "The HWM causes ØMQ to drop messages it can't put onto the queue", but no messages lost when I ran the example. Hit ctrl+c to terminate the durasub.py and then continue it.

the example from the zmq in python.Other languages are the same.

durasub.py

import zmq                                              
import time                                             

context = zmq.Context()                                 


subscriber = context.socket(zmq.SUB)                    
subscriber.setsockopt(zmq.IDENTITY, "Hello")            
subscriber.setsockopt(zmq.SUBSCRIBE, "")                
subscriber.connect("tcp://localhost:5565")              


sync = context.socket(zmq.PUSH)                         
sync.connect("tcp://localhost:5564")                    
sync.send("")                                           


while True:                                             
    data = subscriber.recv()                            
    print data                                          
    if data == "END":                                   
        break                                           

durapub.py

import zmq                                        
import time                                       

context = zmq.Context()                           


sync = context.socket(zmq.PULL)                   
sync.bind("tcp://*:5564")                         

publisher = context.socket(zmq.PUB)               
publisher.bind("tcp://*:5565")                    

publisher.setsockopt(zmq.HWM, 2)                  

sync_request = sync.recv()                        

for n in xrange(10):                              
    msg = "Update %d" % n                         
    publisher.send(msg)                           
    time.sleep(1)                                 

publisher.send("END")                             
Majestic answered 11/11, 2011 at 10:10 Comment(0)
C
17

The suggestion above is valid, but doesn't properly address the problem in this particular code.

The real problem here is that in durapub.py you call publisher.setsockopt(zmq.HWM, 2) AFTER calling publisher.bind. You should call setsockopt BEFORE bind or connect.

Please refer to 0MQ API documentation for setsockopt:

Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE and ZMQ_LINGER, only take effect for subsequent socket bind/connects.

Coan answered 5/5, 2012 at 7:43 Comment(0)
T
0

2024 update:

For the latest version of ZMQ (4.3.6+), you no longer have to set HWM before bind/connect call. Many options can now be set after bind/connect... see documentation.

However, note you may still see ZMQ buffering more than HWM messages. The reason is that HWM only limits the in memory message count. ZMQ will send messages to a kernel buffer, and those messages apparently don't count towards the in-memory message count (though you still may see the program's memory usage rise). To limit the kernel buffer, you'll need to set SNDBUF/RCVBUF options. The minimum buffer on Linux is 2048 for SND and 256 for RCV; depending on the size of your messages this may mean even with minimum kernel buffering (probably not recommended) you could still get > HWM messages being buffered at a time.

While we can't guarantee a bound on buffered messages, I'd recommend something like this:

SNDBUF = average_message_size * desired_kernel_messages
SNDHWM = desired_hwm - desired_kernel_messages

Note that currently, unlike HWM, the BUF options need to be set prior to connect/bind.

Togo answered 15/2 at 0:49 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.