Retrieving subscriber count using zeromq PUB/SUB sockets
Asked Answered
F

4

17

Is it possible to get the total count of subscribers from a PUB socket in zeromq?

Thanks!

Firefly answered 30/1, 2013 at 6:33 Comment(0)
G
10

Yes, but unfortunately not via any simple property or method.

You need to use the zmq_socket_monitor() function to connect an inproc service socket to the main socket you want to observe. From there you can listen to events regarding connect/disconnect and keep your own count of subscribers. It may not be a trivial task though, since it seems (to me at least) a bit hard to know when to consider a subscriber (or any remote connection) to be up/down (closed/disconnected/retry etc.). You will have to play around a bit.

The link includes samples and event descriptions.

Grantham answered 30/1, 2013 at 7:24 Comment(7)
This is for ZMQ version 3 right? I use jzmq as java bindings and I think it doesnt support version 3. Can I do something like that with version 2.x.x?Firefly
JZMQ does AFAIK work with 0MQv3.2. You can also use XPUB sockets to get and count subscriptions (needs 0MQ 3.2)Tapster
The provided link points to ZeroMq 3.2.2. If you want the full Api docs, they are available here.Tactics
@PieterHintjens: Interesting, but that only works as long as subscriptions are not dropped for other reasons than subscribers active unsubscribing (sending byte 0 to the socket), right? If there is a network disconnect or just a crashed subscriber, how will one keep track of the active subscriber count?Tactics
@JakobMöllås if you want to track subscribers no matter what the possible cause of disconnection, you need to switch to a ROUTER-DEALER flow, do your own subscription, and heartbeating. There are lots of examples in later chapters of the Guide.Tapster
@PieterHintjen Thanks, will check it out if the need arise.Tactics
@PieterHintjens If switching to ROUTER-DEALER, subscribers need to be DEALERS themselves, not PUB sockets, right?Pyromania
L
3

This is implementation on NodeJS for rep, I think for pub it's the same.

Like Jakob Möllås said, need to use monitor.

const zmq = require('zmq')
        , rep = zmq.socket('rep');

let counter = 0;

rep.bind('tcp://*:5560', function (err) {
    if (err) {
        console.log(err);
    } else {
        console.log("Listening on 5560…");
        rep.monitor(500, 0);
    }
});

// Register to monitoring events
rep.on('connect', function (fd, ep) {
    console.log('connect, endpoint:', ep);
});
rep.on('connect_delay', function (fd, ep) {
    console.log('connect_delay, endpoint:', ep);
});
rep.on('connect_retry', function (fd, ep) {
    console.log('connect_retry, endpoint:', ep);
});
rep.on('listen', function (fd, ep) {
    console.log('listen, endpoint:', ep);
});
rep.on('bind_error', function (fd, ep) {
    console.log('bind_error, endpoint:', ep);
});
rep.on('accept', function (fd, ep) {
    console.log('accept, endpoint:', ep);
    counter++;
});
rep.on('accept_error', function (fd, ep) {
    console.log('accept_error, endpoint:', ep);
});
rep.on('close', function (fd, ep) {
    console.log('close, endpoint:', ep);
});
rep.on('close_error', function (fd, ep) {
    console.log('close_error, endpoint:', ep);
});
rep.on('disconnect', function (fd, ep) {
    console.log('disconnect, endpoint:', ep);
    counter--;
});

// Handle monitor error
rep.on('monitor_error', function(err) {
    console.log('Error in monitoring: %s, will restart monitoring in 5 seconds', err);
    setTimeout(function() { rep.monitor(500, 0); }, 5000);
});

rep.on('message', function (msg) {
    console.log(`recieve: `, JSON.parse(msg));
    rep.send(JSON.stringify({ "status": FAIL, "code": 3666 }));
});

console

recieve:  { method: 'login', login: 'a', password: 'b1' }
accept, endpoint: tcp://0.0.0.0:5560
accept, endpoint: tcp://0.0.0.0:5560
login: a, password: b1
recieve:  { method: 'login', login: 'a', password: 'b1' }
disconnect, endpoint: tcp://0.0.0.0:5560
login: a, password: b1
disconnect, endpoint: tcp://0.0.0.0:5560
Laue answered 26/4, 2017 at 8:11 Comment(0)
B
2

I encountered a (testing) scenario in which I had to wait for n subscribers before starting to publish messages. Here's the function that did the trick for me (in Python):

def wait_for_n_subscribers(pub_socket: zmq.Socket, n_subscribers: int):
    """
    blocks until pub_socket had n_subscribers connected to it
    """
    connections = 0
    events_socket = pub_socket.get_monitor_socket(events=zmq.EVENT_HANDSHAKE_SUCCEEDED)  # only accept this event
    while connections < n_subscribers:
        recv_monitor_message(events_socket)  # this will block until a handshake was successful
        connections += 1

Explanation:
After creating a PUB socket, we attach a PAIR socket to it, that will monitor the PUB socket for events.
When a SUB socket connects to the PUB socket it generates two events on the PUB (binding) side:
EVENT_ACCEPTED (32) followed by EVENT_HANDSHAKE_SUCCEEDED (4096).

Therefore we monitor for EVENT_HANDSHAKE_SUCCEEDED as the indicator for a successful subsciber connection. once the specified of subscribers is connected, the function returns.

Here's a complete toy-example:

import threading
import time
import zmq
from zmq.utils.monitor import recv_monitor_message  # requires libzmq >= 4.0

ep = "ipc:///tmp/test-socket"


def print_events_map():
    "auxilliary function to print all zmq socket events"
    print("Event names:")
    for name in dir(zmq):
        if name.startswith('EVENT_'):
            value = getattr(zmq, name)
            print("%21s : %4i" % (name, value))


context = zmq.Context()


def wait_for_n_subscribers(pub_socket: zmq.Socket, n_subscribers: int):
    """
    blocks until pub_socket had n_subscribers connected to it
    """
    connections = 0
    events_socket = pub_socket.get_monitor_socket(events=zmq.EVENT_HANDSHAKE_SUCCEEDED)  # only accept this event
    while connections < n_subscribers:
        recv_monitor_message(events_socket)  # this will block until a handshake was successful
        connections += 1


def simulate_sender(wait, n):
    s_pub = context.socket(zmq.PUB)
    s_pub.bind(ep)
    if wait:
        wait_for_n_subscribers(s_pub, n)
    for i in range(5):
        s_pub.send_pyobj(i)
        time.sleep(1)


subscribers = 2
s_sub_1 = context.socket(zmq.SUB)
s_sub_1.setsockopt(zmq.RCVTIMEO, 3000)  # wait at most 3 seconds
s_sub_1.subscribe("")

s_sub_2 = context.socket(zmq.SUB)
s_sub_2.subscribe("")

wait = True  # set to false if publisher should not wait
threading.Thread(target=simulate_sender, args=(wait, subscribers,)).start()
time.sleep(1)
print("connecting 1")
s_sub_1.connect(ep)
print("connecting 2")
s_sub_2.connect(ep)
while True:
    try:
        print("received %s" % s_sub_1.recv_pyobj())
    except zmq.error.Again:
        print("no incoming msgs for 3 seconds")
        break

Notes:

  1. Setting wait to False will cause the subscribers to miss the first published message(s), since the subscriber has a 1 second delay before connecting, and the publisher doesn't wait (for the subscriber to connect).
  2. The scenario assumes that the publisher binds and subsribers connect.
  3. Tested with zmq 4.1.4, pyzmq 20.0.0
Bur answered 28/7, 2021 at 21:0 Comment(0)
I
1

There doesn't seem to be any direct way. Below is Python code to monitor socket events which can be used to maintain count:

import zmq
from zmq.eventloop import ioloop, zmqstream
import zmq.utils.monitor

class Publication:
    def start(self, port, host):
        context = zmq.Context()
        self._socket = context.socket(zmq.PUB)
        self._socket.bind("tcp://%s:%d" % (host, port))
        self._mon_socket = self._socket.get_monitor_socket(zmq.EVENT_CONNECTED | zmq.EVENT_DISCONNECTED)
        self._mon_stream = zmqstream.ZMQStream(self._mon_socket)
        self._mon_stream.on_recv(self._on_mon)

    def _on_mon(self, msg):
        ev = zmq.utils.monitor.parse_monitor_message(msg)
        event = ev['event']
        endpoint = ev['endpoint']
        if event == zmq.EVENT_CONNECTED:
            pass
            # print(endpoint)
        elif event == zmq.EVENT_DISCONNECTED:
            pass
            #print(endpoint)

One issue is that for some reason CONNECTED event is not firing. Another issue is that even when event fires, you only get endpoint ID which is like tcp://ip:port string. So for multiple clients on same node you get same endpoint ID.

Imminent answered 20/12, 2018 at 3:20 Comment(1)
Based on the docs (api.zeromq.org/4-1:zmq-socket-monitor) you might find EVENT_ACCEPTED does what you're expecting i.e. highlight when something has established a connection to your publishing port.Chaparro

© 2022 - 2024 — McMap. All rights reserved.