Pyzmq high-water mark not working on pub socket
Asked Answered
H

1

4

According to the ZeroMQ documentation a pub socket is supposed to drop messages once the number of queued messages reaches the high-water mark.

This doesn't seem to work in the following example (and yes I do set the hwm before bind/connect):

import time
import pickle
from threading import Thread
import zmq

ctx = zmq.Context()

def pub_thread():
    pub = ctx.socket(zmq.PUB)
    pub.set_hwm(2)
    pub.bind('tcp://*:5555')

    i = 0
    while True:
        # Send message every 100ms
        time.sleep(0.1)
        pub.send_string("test", zmq.SNDMORE)
        pub.send_pyobj(i)
        i += 1

def sub_thread():
    sub = ctx.socket(zmq.SUB)
    sub.subscribe("test")
    sub.connect('tcp://localhost:5555')
    while True:
        # Receive messages only every second
        time.sleep(1)
        msg = sub.recv_multipart()
        print("Sub: %d" % pickle.loads(msg[1]))

t_pub = Thread(target=pub_thread)
t_sub = Thread(target=sub_thread)
t_pub.start()
t_sub.start()

while True:
    pass

I'm sending messages on pub 10 times faster than reading them on the sub socket, hwm is set to 2. I would expect to only receive about every 10th message. Instead, I see the following output:

Sub: 0
Sub: 1
Sub: 2
Sub: 3
Sub: 4
Sub: 5
Sub: 6
Sub: 7
Sub: 8
Sub: 9
Sub: 10
Sub: 11
Sub: 12
Sub: 13
Sub: 14
...

so I see all messages arriving, thus they are held in some queue until I read them. Same holds true when adding a hwm=2 on the sub socket as well before connect.

What am I doing wrong or am I misunderstanding hwm?

I use pyzmq version 17.1.2

Hyperkinesia answered 17/11, 2018 at 23:14 Comment(2)
Try with this and this post.Heyerdahl
I updated my answer. hope help you up.Heyerdahl
H
2

With borrowing an answer to the issue which I opened in Github, I've updated my answer as follows:


Messages are held in operating system's network buffers. I have found HWMs to be not that useful because of that. Here is modified code where subscriber misses messages:

import time
import pickle
import zmq
from threading import Thread
import os

ctx = zmq.Context()

def pub_thread():
    pub = ctx.socket(zmq.PUB)
    pub.setsockopt(zmq.SNDHWM, 2)
    pub.setsockopt(zmq.SNDBUF, 2*1024)  # See: http://api.zeromq.org/4-2:zmq-setsockopt
    pub.bind('tcp://*:5555')
    i = 0
    while True:
        time.sleep(0.001)
        pub.send_string(str(i), zmq.SNDMORE)
        pub.send(os.urandom(1024))
        i += 1

def sub_thread():
    sub = ctx.socket(zmq.SUB)
    sub.setsockopt(zmq.SUBSCRIBE, b'')
    sub.setsockopt(zmq.RCVHWM, 2)
    sub.setsockopt(zmq.RCVBUF, 2*1024)
    sub.connect('tcp://localhost:5555')
    while True:
        time.sleep(0.1)
        msg, _ = sub.recv_multipart()
        print("Received:", msg.decode())

t_pub = Thread(target=pub_thread)
t_pub.start()
sub_thread()

Output looks something like this:

Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 47
Received: 48
Received: 64
Received: 65
Received: 84
Received: 85
Received: 159
Received: 160
Received: 270

Messages are missed because all queues/buffers are full and publisher starts to drop messages (see documentation for ZMQ_PUB: http://api.zeromq.org/4-2:zmq-socket).


[NOTE]:

  • You should use the high-water mark option in listener/subscriber and advertiser/publisher.
  • These posts are also relevant (Post1 - Post2)
  • sock.setsockopt(zmq.CONFLATE, 1) is another option to get the last message only which defined in subscriber side.
Heyerdahl answered 18/11, 2018 at 11:50 Comment(6)
Thanks for your suggestions. However, set_hwm is equivalent to setsockopt in this case, it internally calls setsockopt(zmq.SNDHWM, x) or setsockopt(zmq.RCVHWM, x) depending on the socket type. Your code produces the exact same output as mine, adding the hwm on the receiver side doesn't change the behavior, as mentioned in my initial post. The conflate setting "Does not supports multi-part messages" according to ZeroMQ docs, so its not an option.Hyperkinesia
@Hyperkinesia I double check it.Heyerdahl
Do you mean "I have double checked it, it's working" or "I will double check that its still not working, as you state" ?Hyperkinesia
@Hyperkinesia No, on your case didn't work, I had the same problem, but I couldn't resolve it using hwm then my problem fixed using conflate option.Heyerdahl
@Hyperkinesia Also I open a github issue in pyzmq repository.Heyerdahl
FYI, the minimum buffer on Linux is 2048 for SND and 256 for RCV. So you can't just set the buffer very small to force it to use the high water mark values (as I thought might be possible).Wish

© 2022 - 2024 — McMap. All rights reserved.