ZMQ drop old messages
Asked Answered
I

1

6

I am trying to create a real life system where an action needs to be performed by the subscriber on real-time data provided by the publisher. Sometimes the PUB and SUB get out of sync(by up to 10 seconds) because they are performing some operations and I always need the latest data from the publisher other wise the actions performed by the subscriber will be way off.

I am trying to use a SUB/PUB approach and I am trying set a HWM limit but it does not seems to work. I have tried the disconnect approach but it adds an additional one second lag into the system and 90% of the time my system is working in real time so by using disconnect the whole system collapses.

Subscriber(I am trying to model the actual system through time.sleep()):

import time
import zmq
import random

context = zmq.Context()
consumer_receiver = context.socket(zmq.SUB)

consumer_receiver.set_hwm(0)
consumer_receiver.connect("tcp://127.0.0.1:5555")

consumer_receiver.subscribe(b'')


while 1:
    d=random.randint(0,10)

    work = consumer_receiver.recv_pyobj()
    # consumer_receiver.disconnect()
    print(work,"  :",d)
    time.sleep(d)

Publisher:

import time
import zmq

context = zmq.Context()
zmq_socket = context.socket(zmq.PUB)
zmq_socket.bind("tcp://127.0.0.1:5555")

for x in range(1000):

    # zmq_socket.send_string("", zmq.SNDMORE)
    zmq_socket.send_pyobj(x,zmq.NOBLOCK)
    time.sleep(1)
    print(x)
Intermit answered 31/12, 2019 at 10:10 Comment(2)
zmq.CONFLATE keeps the last message only, in other words, you will destroy the queue. So there is another option which sets a limit on the queue using zmq.RCVBUF on subscriber side and zmq.SNDBUF option on publisher side. Check this post, hope help you up.Panhellenism
Thanks but I am working with a feedback system and I really only need the latest data I have the luxury to ignore some data (i need speed over accuracy) so Conflate is really what i needed. Thanks for the info though it might help out on another project that i am working onIntermit
I
7

OK my savior is CONFLATE. Thanks to this post the problem seems to be solved

import time
import zmq
import random

context = zmq.Context()
consumer_receiver = context.socket(zmq.SUB)    

consumer_receiver.setsockopt(zmq.CONFLATE, 1)

consumer_receiver.connect("tcp://127.0.0.1:5555") 
consumer_receiver.subscribe(b'')


while 1:
    d=random.randint(1,10)

    work = consumer_receiver.recv_pyobj()

    print(work,"  :",d)

    time.sleep(d)
Intermit answered 31/12, 2019 at 11:24 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.