Is it possible to get the total count of subscribers from a PUB socket in zeromq?
Thanks!
Is it possible to get the total count of subscribers from a PUB socket in zeromq?
Thanks!
Yes, but unfortunately not via any simple property or method.
You need to use the zmq_socket_monitor() function to connect an inproc
service socket to the main socket you want to observe. From there you can listen to events regarding connect/disconnect and keep your own count of subscribers. It may not be a trivial task though, since it seems (to me at least) a bit hard to know when to consider a subscriber (or any remote connection) to be up/down (closed/disconnected/retry etc.). You will have to play around a bit.
The link includes samples and event descriptions.
This is implementation on NodeJS for rep, I think for pub it's the same.
Like Jakob Möllås said, need to use monitor.
const zmq = require('zmq')
, rep = zmq.socket('rep');
let counter = 0;
rep.bind('tcp://*:5560', function (err) {
if (err) {
console.log(err);
} else {
console.log("Listening on 5560…");
rep.monitor(500, 0);
}
});
// Register to monitoring events
rep.on('connect', function (fd, ep) {
console.log('connect, endpoint:', ep);
});
rep.on('connect_delay', function (fd, ep) {
console.log('connect_delay, endpoint:', ep);
});
rep.on('connect_retry', function (fd, ep) {
console.log('connect_retry, endpoint:', ep);
});
rep.on('listen', function (fd, ep) {
console.log('listen, endpoint:', ep);
});
rep.on('bind_error', function (fd, ep) {
console.log('bind_error, endpoint:', ep);
});
rep.on('accept', function (fd, ep) {
console.log('accept, endpoint:', ep);
counter++;
});
rep.on('accept_error', function (fd, ep) {
console.log('accept_error, endpoint:', ep);
});
rep.on('close', function (fd, ep) {
console.log('close, endpoint:', ep);
});
rep.on('close_error', function (fd, ep) {
console.log('close_error, endpoint:', ep);
});
rep.on('disconnect', function (fd, ep) {
console.log('disconnect, endpoint:', ep);
counter--;
});
// Handle monitor error
rep.on('monitor_error', function(err) {
console.log('Error in monitoring: %s, will restart monitoring in 5 seconds', err);
setTimeout(function() { rep.monitor(500, 0); }, 5000);
});
rep.on('message', function (msg) {
console.log(`recieve: `, JSON.parse(msg));
rep.send(JSON.stringify({ "status": FAIL, "code": 3666 }));
});
console
recieve: { method: 'login', login: 'a', password: 'b1' }
accept, endpoint: tcp://0.0.0.0:5560
accept, endpoint: tcp://0.0.0.0:5560
login: a, password: b1
recieve: { method: 'login', login: 'a', password: 'b1' }
disconnect, endpoint: tcp://0.0.0.0:5560
login: a, password: b1
disconnect, endpoint: tcp://0.0.0.0:5560
I encountered a (testing) scenario in which I had to wait for n subscribers before starting to publish messages. Here's the function that did the trick for me (in Python):
def wait_for_n_subscribers(pub_socket: zmq.Socket, n_subscribers: int):
"""
blocks until pub_socket had n_subscribers connected to it
"""
connections = 0
events_socket = pub_socket.get_monitor_socket(events=zmq.EVENT_HANDSHAKE_SUCCEEDED) # only accept this event
while connections < n_subscribers:
recv_monitor_message(events_socket) # this will block until a handshake was successful
connections += 1
Explanation:
After creating a PUB
socket, we attach a PAIR
socket to it, that will monitor the PUB
socket for events.
When a SUB
socket connects to the PUB
socket it generates two events on the PUB (binding) side:
EVENT_ACCEPTED (32)
followed by EVENT_HANDSHAKE_SUCCEEDED (4096)
.
Therefore we monitor for EVENT_HANDSHAKE_SUCCEEDED
as the indicator for a successful subsciber connection.
once the specified of subscribers is connected, the function returns.
Here's a complete toy-example:
import threading
import time
import zmq
from zmq.utils.monitor import recv_monitor_message # requires libzmq >= 4.0
ep = "ipc:///tmp/test-socket"
def print_events_map():
"auxilliary function to print all zmq socket events"
print("Event names:")
for name in dir(zmq):
if name.startswith('EVENT_'):
value = getattr(zmq, name)
print("%21s : %4i" % (name, value))
context = zmq.Context()
def wait_for_n_subscribers(pub_socket: zmq.Socket, n_subscribers: int):
"""
blocks until pub_socket had n_subscribers connected to it
"""
connections = 0
events_socket = pub_socket.get_monitor_socket(events=zmq.EVENT_HANDSHAKE_SUCCEEDED) # only accept this event
while connections < n_subscribers:
recv_monitor_message(events_socket) # this will block until a handshake was successful
connections += 1
def simulate_sender(wait, n):
s_pub = context.socket(zmq.PUB)
s_pub.bind(ep)
if wait:
wait_for_n_subscribers(s_pub, n)
for i in range(5):
s_pub.send_pyobj(i)
time.sleep(1)
subscribers = 2
s_sub_1 = context.socket(zmq.SUB)
s_sub_1.setsockopt(zmq.RCVTIMEO, 3000) # wait at most 3 seconds
s_sub_1.subscribe("")
s_sub_2 = context.socket(zmq.SUB)
s_sub_2.subscribe("")
wait = True # set to false if publisher should not wait
threading.Thread(target=simulate_sender, args=(wait, subscribers,)).start()
time.sleep(1)
print("connecting 1")
s_sub_1.connect(ep)
print("connecting 2")
s_sub_2.connect(ep)
while True:
try:
print("received %s" % s_sub_1.recv_pyobj())
except zmq.error.Again:
print("no incoming msgs for 3 seconds")
break
Notes:
wait
to False will cause the subscribers to miss the first published message(s), since the subscriber has a 1 second delay before connecting, and the publisher doesn't wait (for the subscriber to connect).There doesn't seem to be any direct way. Below is Python code to monitor socket events which can be used to maintain count:
import zmq
from zmq.eventloop import ioloop, zmqstream
import zmq.utils.monitor
class Publication:
def start(self, port, host):
context = zmq.Context()
self._socket = context.socket(zmq.PUB)
self._socket.bind("tcp://%s:%d" % (host, port))
self._mon_socket = self._socket.get_monitor_socket(zmq.EVENT_CONNECTED | zmq.EVENT_DISCONNECTED)
self._mon_stream = zmqstream.ZMQStream(self._mon_socket)
self._mon_stream.on_recv(self._on_mon)
def _on_mon(self, msg):
ev = zmq.utils.monitor.parse_monitor_message(msg)
event = ev['event']
endpoint = ev['endpoint']
if event == zmq.EVENT_CONNECTED:
pass
# print(endpoint)
elif event == zmq.EVENT_DISCONNECTED:
pass
#print(endpoint)
One issue is that for some reason CONNECTED event is not firing. Another issue is that even when event fires, you only get endpoint ID which is like tcp://ip:port string. So for multiple clients on same node you get same endpoint ID.
EVENT_ACCEPTED
does what you're expecting i.e. highlight when something has established a connection to your publishing port. –
Chaparro © 2022 - 2024 — McMap. All rights reserved.