ZeroMQ: I want Publish–Subscribe to drop older messages in favor of newer ones
Asked Answered
T

2

6

I'm using ZeroMQ publish–subscribe sockets to connect two processes. The publishing process is a sensor, and has a much faster refresh rate than the subscription process. I want the subscription process to only use the most recent message in the queue — and ignore older messages altogether.

I've tried setting a highwater mark on the subscriber, but that seems to drop newer messages rather than older.

Is there a publish–subscribe pattern someone can direct me toward for this purpose?

Tilghman answered 19/9, 2014 at 19:19 Comment(1)
A duplicate question with a useful answer when filtering incoming messages : #34503752Battledore
T
9

read about the conflate feature from documentation on zeromq (it is kind of new), I think it is exactly what you want.

From the documentation:

ZMQ_CONFLATE: Keep only last message If set, a socket shall keep only one message in its inbound/outbound queue, this message being the last message received/the last message to be sent. Ignores 'ZMQ_RCVHWM' and 'ZMQ_SNDHWM' options. Does not support multi-part messages, in particular, only one part of it is kept in the socket internal queue.

Triplenerved answered 21/9, 2014 at 15:30 Comment(1)
Great, thanks. Now we can delete the comments to keep things tidy.Coincidental
T
0

Okay, I found a solution, but I don't know if it's the best one — so I won't mark it as correct just yet.

zmq::message_t message;
int events = 0;
size_t events_size = sizeof(int);

// Priming read
subscriber.recv(&message);

// See if there are more messages to read
subscriber.getsockopt(ZMQ_EVENTS, static_cast<void*>(&events), &events_size);
while (events & ZMQ_POLLIN) {

  // Receive the new (and perhaps most recent) message
  subscriber.recv(&message);

  // Poll again for additional messages
  subscriber.getsockopt(ZMQ_EVENTS, static_cast<void*>(&events), &events_size);
}

// Now, message points to the most recent received data.

This strategy has the added advantage that the queue shouldn't fill up. The disadvantage is that my publisher could conceivably send faster than this loop in the subscriber could be run, and then it'll loop indefinitely.

That seems unlikely, but I'd like to make it impossible. I'm not quite sure how to accomplish this goal yet.

Tilghman answered 19/9, 2014 at 20:42 Comment(2)
You're in an impossible scenario, since the subscriber can never know that the message it's receiving will be processed before a new message is sent, and if it's not fast enough to keep up, then you're stuck. The only way to handle this scenario is to use multiple subscribers which, collectively, will have enough bandwidth to handle the maximum output from your publisher. Instead of using pub-sub, use dealer-router so you'll round robin the messages among all clients, and just handle figuring which message should supersede on the back end. Messy. Conflate, if available, is better.Surname
See the other answer for details on conflate.Surname

© 2022 - 2024 — McMap. All rights reserved.