How to have limited ZMQ (ZeroMQ - PyZMQ) queue buffer size in python?
Asked Answered
R

2

9

I'm using pyzmq library with pub/sub pattern. I have some fast ZMQ publishers using .connect() method and a slower ZMQ subscriber using .bind() method. Then after a few minutes, my subscriber receives the old data published from the publisher — due to ZMQ buffer.


My Question:

Is there any approach to manage ZMQ queue buffer size? (set a limited buffer)

[NOTE]:

  • I don't want to use ZMQ PUSH/PULL.
  • I've read this post, but this approach clear buffer only: clear ZMQ buffer
  • I also tried with high watermark options, but it didn't work:
socket.setsockopt(zmq.RCVHWM, 10)  # not working
socket.setsockopt(zmq.SNDHWM, 10)  # not working

Publisher:

import zmq
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
socket.setsockopt(zmq.SNDHWM, 10)  # not working

while True:
    data = time.time()
    print("%d" % data)
    socket.send("%d" % data)
    time.sleep(1)

Subscriber:

import zmq
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.RCVHWM, 10)  # not working

while 1:
    time.sleep(2)  # A speed reducer like.
    data = socket.recv()
    print(data)

The queue size is still more than 10 despite these options (via configured send/receive high watermark).

Radu answered 16/1, 2018 at 10:15 Comment(1)
Welcome @BenyaminJafari. Could you kindly expand a bit your target definition, Sir? What are the { PASS | FAIL }-criteria for auditing any approach if the set goal was indeed achieved or not - i.e. for " an approach to manage ZMQ queue buffer size "? Best using a mandatory-part only, formulated as a MUST_HAVE:-feature list ( with NICE_TO_HAVE:(s) if indeed some such desire dictates some voluntary add-ons ), that's fair, isn't it?Hodgepodge
R
8

I found an approach to get the last message only in ZMQ subscriber using CONFLATE option.

Note that you should set the CONFLATE option before you connect:

import zmq
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)

socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.CONFLATE, 1)  # last msg only.
socket.connect("tcp://localhost:%s" % port)  # must be placed after above options.

while 1:
    time.sleep(2)  # Dummy delay
    data = socket.recv()
    print(data)

In other words, it removes any buffered queue on the subscriber side.


[NOTE]:

In addition, by using zmq.SNDBUF and zmq.RCVBUF options we can set a limit on ZMQ buffer size. (More information)


Radu answered 26/1, 2018 at 12:3 Comment(2)
You say "you should set the CONFLATE option after you connect", but in your code snippet you set the CONFLATE option before connect. I think the code is correct, and the sentence should say before.Fastening
keep in mind this unfortunately doesn't work with multipart messages.Haletky
P
0

To set the queue/buffer size you need to set the high water marks via the socket options

setsockopt(zmq.SNDHWM, 10)
setsockopt(zmq.RCVHWM, 10)
Propagandize answered 25/1, 2018 at 11:7 Comment(6)
I try it before, but not it working, I will updating my question with result of this options.Radu
Try setting the RCVHWM to 1Propagandize
No change happened really.Radu
You might also need to set the underlying TCP buffer sizes, its hard to eradicate the buffer completely. You could try the experiment using inproc protocol with two threads instead of TCP as you will have more control over the bufferingPropagandize
And on the send side you could set conflate if you really only care about the current/latest messagePropagandize
Check the c++ doc but I think the python will be ZMQ.CONFLATEPropagandize

© 2022 - 2024 — McMap. All rights reserved.