Get subscriber filter from a ZMQ PUB socket
Asked Answered
J

2

14

I noticed in the FAQ, in the Monitoring section, that it's not possible to get a list of connected peers or to be notified when peers connect/disconnect.

Does this imply that it's also not possible to know which topics a PUB/XPUB socket knows it should publish, from its upstream feedback? Or is there some way to access that data?

I know that ZMQ >= 3.0 "supports PUB/SUB filtering at the publisher", but what I really want is to filter at my application code, using the knowledge ZMQ has about which topics are subscribed to.

My use-case is that I want to publish info about the status of a robot. Some topics involve major hardware actions, like switching the select lines on an ADC to read IR values.

I have a publisher thread running on the bot that should only do that "read" to get IR data when there are actually subscribers. However, since I can only feed a string into my pub_sock.send, I always have to do the costly operation, even if ZMQ is about to drop that message when there are no subscribers.

I have an implementation that uses a backchannel REQ/REP socket to send topic information, which my app can check in its publish loop, thereby only collecting data that needs to be collected. This seems very inelegant though, since ZMQ must already have the data I need, as evidenced by its filtering at the publisher.

I noticed that in this mailing list message, the OP seems to be able to see subscribe messages being sent to an XPUB socket.

However, there's no mention of how they did that, and I'm not seeing any such ability in the docs (still looking). Maybe they were just using Wireshark (to see upstream subscribe messages to an XPUB socket).

Johan answered 7/2, 2014 at 19:2 Comment(5)
I've posted twice on the #zeromq IRC channel to ask about this, with 6 hour offsets to help with timezones, but so far have received no response.Johan
This is still an open question for which I'm actively seeking an answer.Johan
Did you find an answer? I also need my PUB server to know what filters are subscribed too. The server doesnt need to create data that no clients are interested in. (Example: Say if SUB clients are only subscribed to weather data for New York, then the PUB server shouldn't have to create the data for every other city in the world, just to throw it away.)Shipman
Sounds like the same use-case. Have you tried frans' and Freek Wiekmeijer's answers below? I'm no longer active in the relevant codebase, we just kept the backchannel hack.Johan
Thanks. I ended up manually re-sending the subscribe information (from client to server) on a separate PUSH/PULL socket. It felt like a bit of a hack, but it works okay. This separate channel could also be used as a heartbeat from the client. The clients would resending their subscription requests once in a while. The server could then simply stop compiling and sending topics that no clients had re-subscribed too in the last minute or so.Shipman
S
8

Using zmq.XPUB socket type, there is a way to detect new and leaving subscribers. The following code sample shows how:

# Publisher side
import zmq

ctx = zmq.Context.instance()
xpub_socket = ctx.socket(zmq.XPUB)
xpub_socket.bind("tcp://*:%d" % port_nr)
poller = zmq.Poller()
poller.register(xpub_socket)

events = dict(poller.poll(1000))
if xpub_socket in events:
    msg = xpub_socket.recv()
    if msg[0] == b'\x01':
        topic = msg[1:]
        print "Topic '%s': new subscriber" % topic
    elif msg[0] == b'\x00':
        topic = msg[1:]
        print "Topic '%s': subscriber left" % topic

Note that the zmq.XSUB socket type does not subscribe in the same manner as the "normal" zmq.SUB. Code sample:

# Subscriber side
import zmq
ctx = zmq.Context.instance()

# Subscribing of zmq.SUB socket
sub_socket = ctx.socket(zmq.SUB)
sub_socket.setsockopt(zmq.SUBSCRIBE, "sometopic") # OK
sub_socket.connect("tcp://localhost:%d" % port_nr)

# Subscribing zmq.XSUB socket
xsub_socket = ctx.socket(zmq.XSUB)
xsub_socket.connect("tcp://localhost:%d" % port_nr)
# xsub_socket.setsockopt(zmq.SUBSCRIBE, "sometopic") # NOK, raises zmq.error.ZMQError: Invalid argument
xsub_socket.send_multipart([b'\x01', b'sometopic']) # OK, triggers the subscribe event on the publisher

I'd also like to point out the zmq.XPUB_VERBOSE socket option. If set, all subscription events are received on the socket. If not set, duplicate subscriptions are filtered. See also the following post: ZMQ: No subscription message on XPUB socket for multiple subscribers (Last Value Caching pattern)

Scalenus answered 4/8, 2015 at 14:31 Comment(0)
S
1

At least for the XPUB/XSUB socket case you can save a subscription state by forwarding and handling the packages manually:

context = zmq.Context()

xsub_socket = context.socket(zmq.XSUB)
xsub_socket.bind('tcp://*:10000')
xpub_socket = context.socket(zmq.XPUB)
xpub_socket.bind('tcp://*:10001')

poller = zmq.Poller()
poller.register(xpub_socket, zmq.POLLIN)
poller.register(xsub_socket, zmq.POLLIN)

while True:
    try:
        events = dict(poller.poll(1000))
    except KeyboardInterrupt:
        break

    if xpub_socket in events:
        message = xpub_socket.recv_multipart()

        # HERE goes some subscription handle code which inspects
        # message

        xsub_socket.send_multipart(message)
    if xsub_socket in events:
        message = xsub_socket.recv_multipart()
        xpub_socket.send_multipart(message)

(this is Python code but I guess C/C++ looks quite similar)

I'm currently working on this topic and I will add more information as soon as possible.

Soppy answered 18/6, 2015 at 8:10 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.