ZMQ: No subscription message on XPUB socket for multiple subscribers (Last Value Caching pattern)
Asked Answered
D

1

7

I implemented the Last Value Caching (LVC) example of ZMQ (http://zguide.zeromq.org/php:chapter5#Last-Value-Caching), but can't get a 2nd subscriber to register at the backend.

The first time a subscriber comes on board, the event[0] == b'\x01' condition is met and the cached value is sent, but the second subscriber (same topic) doesn't even register (if backend in events: is never true). Everything else works fine. Data gets passed from the publisher to the subscribers (all).

What could be the reason for this? Is the way the backend is connected correct? Is this pattern only supposed to work with the first subscriber?

Update

When I subscribe the 2nd subscriber to another topic, I get the right behaviour (i.e. \x01 when subscribing). This really seems to work for the first subscriber onlt . Is is a bug in ZeroMQ?

Update 2

Here's a minimal working example that shows that the LVC pattern is not working (at least not the way it's implemented here).

# subscriber.py
import zmq

def main():
    ctx = zmq.Context.instance()
    sub = ctx.socket(zmq.SUB)
    sub.connect("tcp://127.0.0.1:5558")

    # Subscribe to every single topic from publisher
    print 'subscribing (sub side)'
    sub.setsockopt(zmq.SUBSCRIBE, b"my-topic")

    poller = zmq.Poller()
    poller.register(sub, zmq.POLLIN)
    while True:
        try:
            events = dict(poller.poll(1000))
        except KeyboardInterrupt:
            print("interrupted")
            break

        # Any new topic data we cache and then forward
        if sub in events:
            msg = sub.recv_multipart()
            topic, current = msg
            print 'received %s on topic %s' % (current, topic)

if __name__ == '__main__':
    main() 

And here's the broker (as in the example, but with a bit more verbosity and an integrated publisher).

# broker.py
# from http://zguide.zeromq.org/py:lvcache
import zmq
import threading
import time


class Publisher(threading.Thread):
    def __init__(self):
        super(Publisher, self).__init__()

    def run(self):
        time.sleep(10)
        ctx = zmq.Context.instance()
        pub = ctx.socket(zmq.PUB)
        pub.connect("tcp://127.0.0.1:5557")

        cnt = 0
        while True:
            msg = 'hello %d' % cnt
            print 'publisher is publishing %s' % msg
            pub.send_multipart(['my-topic', msg])
            cnt += 1
            time.sleep(5)


def main():
    ctx = zmq.Context.instance()
    frontend = ctx.socket(zmq.SUB)
    frontend.bind("tcp://*:5557")
    backend = ctx.socket(zmq.XPUB)
    backend.bind("tcp://*:5558")

    # Subscribe to every single topic from publisher
    frontend.setsockopt(zmq.SUBSCRIBE, b"")

    # Store last instance of each topic in a cache
    cache = {}

    # We route topic updates from frontend to backend, and
    # we handle subscriptions by sending whatever we cached,
    # if anything:
    poller = zmq.Poller()
    poller.register(frontend, zmq.POLLIN)
    poller.register(backend, zmq.POLLIN)


    # launch a publisher
    p = Publisher()
    p.daemon = True
    p.start()

    while True:

        try:
            events = dict(poller.poll(1000))
        except KeyboardInterrupt:
            print("interrupted")
            break

        # Any new topic data we cache and then forward
        if frontend in events:
            msg = frontend.recv_multipart()
            topic, current = msg
            cache[topic] = current
            backend.send_multipart(msg)

        ### this is where it fails for the 2nd subscriber. 
        ### There's never even an event from the backend 
        ### in events when the 2nd subscriber is subscribing.

        # When we get a new subscription we pull data from the cache:
        if backend in events:
            print 'message from subscriber'
            event = backend.recv()
            # Event is one byte 0=unsub or 1=sub, followed by topic
            if event[0] == b'\x01':
                topic = event[1:]
                print ' => subscribe to %s' % topic
                if topic in cache:
                    print ("Sending cached topic %s" % topic)
                    backend.send_multipart([ topic, cache[topic] ])
            elif event[0] == b'\x00':
                topic = event[1:]
                print ' => unsubscribe from %s' % topic

if __name__ == '__main__':
    main()

Running this code (1 x broker.py, 2 x subscriber.py) shows that the first subscriber registers at the broker as expected (\x01 and cache lookup), but the 2nd subscriber does not get registered the same way. Interestingly, the 2nd subscriber is hooked up to the pub/sub channel, as after a while (10 sec) both subscribers receive data from the publisher.

This is very strange. Perhaps some of my libraries are outdated. Here's what I got:

Python 2.7.9 (v2.7.9:648dcafa7e5f, Dec 10 2014, 10:10:46) 
[GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> zmq.__version__
'14.1.1'

$ brew info zeromq
zeromq: stable 4.0.5 (bottled), HEAD
High-performance, asynchronous messaging library
http://www.zeromq.org/
/usr/local/Cellar/zeromq/4.0.5_2 (64 files, 2.8M) *
  Poured from bottle
From: https://github.com/Homebrew/homebrew/blob/master/Library/Formula/zeromq.rb
==> Dependencies
Build: pkg-config ✔
Optional: libpgm ✘, libsodium ✘

Update 3

This behaviour can also be observed in zeromq 4.1.2 and pyzmq-14.7.0 (with or without libpgm and libsodium installed).

Update 4

Another observation suggests that the first subscriber is somehow handled differently: The first subscriber is the only one unsubscribing in the expected way from the XPUB socket (backend) by preceding its subscription topic with \x00. The other subscribers (I tried more than 2) stayed mute on the backend channel (although receiving messages).

Update 5

I hope I'm not going down a rabbit hole, but I've looked into the czmq bindings and ran my Python example in C. The results are the same, so I guess it's not a problem with the bindings, but with libzmq.

I also verified that the 2nd subscriber is sending a subscribe message and indeed I can see this on the wire:

First subscribe:

0000  02 00 00 00 45 00 00 3f  98 be 40 00 40 06 00 00   ....E..? ..@.@...
0010  7f 00 00 01 7f 00 00 01  fa e5 15 b6 34 f0 51 c3   ........ ....4.Q.
0020  05 e4 8b 77 80 18 31 d4  fe 33 00 00 01 01 08 0a   ...w..1. .3......
0030  2a aa d1 d2 2a aa cd e9  00 09 01 6d 79 2d 74 6f   *...*... ...my-to
0040  70 69 63                                           pic              

2nd subscribe message with difference (to above) marked and explained. The same data is sent in the subscribe frame.

                               identification
                               v
0000  02 00 00 00 45 00 00 3f  ed be 40 00 40 06 00 00   ....E..? ..@.@...
                             src port      sequence number
                                  v        v  v  v  v
0010  7f 00 00 01 7f 00 00 01  fa e6 15 b6 17 da 02 e7   ........ ........

Acknowledgement number   window scaling factor
      v  v  v  v           v
0020  71 4b 33 e6 80 18 31 d5  fe 33 00 00 01 01 08 0a   qK3...1. .3......

timestamp value  timestamp echo reply
            v           v  v   |<-------- data -------
0030  2a aa f8 2c 2a aa f4 45  00 09 01 6d 79 2d 74 6f   *..,*..E ...my-to

      ------>|
0040  70 69 63                                           pic              
Doner answered 16/6, 2015 at 9:49 Comment(0)
D
9

I found the solution for this problem, and even though I read the docs front to back and back to front, I had not seen it. The key is XPUB_VERBOSE. Add this line to after the backend initialisation and everything works fine

backend.setsockopt(zmq.XPUB_VERBOSE, True)

Here's an extract from the official documentation:

ZMQ_XPUB_VERBOSE: provide all subscription messages on XPUB sockets Sets the XPUB socket behavior on new subscriptions and unsubscriptions. A value of 0 is the default and passes only new subscription messages to upstream. A value of 1 passes all subscription messages upstream.

Option value type int Option value unit 0, 1 Default value 0 Applicable socket types ZMQ_XPUB

Pieter Hintjens has some more information on this in his blog. This is the relevant section:

A few months ago we added a neat little option (ZMQ_XPUB_VERBOSE) to XPUB sockets that disables its filtering of duplicate subscriptions. This now works for any number of subscribers. We use this as follows:

void *publisher = zsocket_new (ctx, ZMQ_XPUB);
zsocket_set_xpub_verbose (publisher, 1);
zsocket_bind (publisher, "tcp://*:6001");

The LVC pattern description should be updated to reflect this setting, as this pattern won't work otherwise.

Doner answered 22/6, 2015 at 8:37 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.