ZeroMQ PUB socket buffers all my out going data when it is connecting
Asked Answered
G

6

14

I noticed that a zeromq PUB socket will buffers all outgoing data if it is connecting, for example

import zmq
import time
context = zmq.Context()

# create a PUB socket
pub = context.socket (zmq.PUB)
pub.connect("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
    pub.send('a message should not be dropped')

time.sleep(1)

# create a SUB socket
sub = context.socket (zmq.SUB)
sub.bind("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")

time.sleep(1)

# this is the only message we should see in SUB
pub.send('hi')

while True:
    print sub.recv()

The sub binds after those messages, they should be dropped, because PUB should drop messages if no one connected to it. But instead of dropping messages, it buffers all messages.

a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
hi

As you can see, those "a message should not be dropped" are buffered by the socket, once it gets connected, it flush them to SUB socket. If I bind at the PUB socket, and connect at the SUB socket, then it works correctly.

import zmq
import time
context = zmq.Context()

# create a PUB socket
pub = context.socket (zmq.PUB)
pub.bind("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
    pub.send('a message should not be dropped')

time.sleep(1)

# create a SUB socket
sub = context.socket (zmq.SUB)
sub.connect("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")

time.sleep(1)

# this is the only message we should see in SUB
pub.send('hi')

while True:
    print repr(sub.recv())

And you can only see the output

'hi'

This kind of strange behavior cause a problem, it buffers all data on a connecting socket, I have two servers, server A publishes data to server B

Server A -- publish --> Server B

It works fine if server B gets online. But what if I start the Server A and do not start Server B?

As the result, the connecting PUB socket on Server A keeps all those data, the memory usage gets higher and higher.

Here is the problem, is this kind of behavior a bug or feature? If it is feature, where can I find a document that mentions this behavior? And how can I stop the connecting PUB socket buffers all data?

Thanks.

Gyrostatic answered 21/1, 2012 at 9:53 Comment(0)
K
7

Whether the socket blocks or drops messages depends on the socket type as described in the ZMQ::Socket documentation (emphasis below is mine):

ZMQ::HWM: Retrieve high water mark

The ZMQ::HWM option shall retrieve the high water mark for the specified socket. The high water mark is a hard limit on the maximum number of outstanding messages 0MQ shall queue in memory for any single peer that the specified socket is communicating with.

If this limit has been reached the socket shall enter an exceptional state and depending on the socket type, 0MQ shall take appropriate action such as blocking or dropping sent messages. Refer to the individual socket descriptions in ZMQ::Socket for details on the exact action taken for each socket type.

The default ZMQ::HWM value of zero means “no limit”.

You can see if it will block or drop by looking through the documentation for the socket type for ZMQ::HWM option action which will either be Block or Drop.

The action for ZMQ::PUB is Drop, so if it is not dropping you should check the HWM (High Water Mark) value and heed the warning that The default ZMQ::HWM value of zero means “no limit”, meaning that it will not enter an exceptional state until the system runs out of memory (at which point I don't know how it behaves).

Katsuyama answered 22/1, 2012 at 3:24 Comment(4)
I know I can set the HWM to limit message number in the buffer. But it doesn't solve the problem, they way PUB handles HWM state is to drop new messages. It means if you set HWM, only leading messages are kept in the buffer. What I'm writing is audio streaming system. This kind of behavior makes it very annoying to use. Let's say, you send messages [1, 2, 3, 4], and then HWM was set to 2, then the socket will buffer [1, 2] for you, all new messages are dropped. But for audio streaming, the most important part is new coming data. Is there any way to adjust how HWM drops message?Gyrostatic
Ah, so you mean the behavior you would like is that if HWM is set to 2 and you send [1, 2, 3, 4] then it should drop [1, 2] and keep [3, 4], but then if you sent 5 it should drop 3 and you end up with [4, 5]? I don't think that behavior exists in ZMQ.Katsuyama
This is very interesting. Certainly having the ability to drop the "older" messages would be necessary for some applications (IP telephony comes to mind as a common example).Pentecostal
Did you find a solution to drop old messages instead of new, when the receiving queue gets full? (I have a realtime feed where messages are only useful/valid when they are fresh. I dont want to get queued up old messages. Ideally the queue would always just contain one message, and drop the old message every time it gets a new message).Cankerworm
T
5

I feel this behavior is the semantic of zmq_connect(). That is: when zmq_connect() returns success, then the connection is conceptually established, and thus your connecting-PUB starts queuing message instead of dropping.

Following excerpt from "ZMQ Guide" is a hint for this:

In theory with ØMQ sockets, it does not matter which end connects, and which end binds. However with PUB-SUB sockets, if you bind the SUB socket and connect the PUB socket, the SUB socket may receive old messages, i.e. messages sent before the SUB started up. This is an artifact of the way bind/connect works. It's best to bind the PUB and connect the SUB, if you can.

Following section in zmq_connect() has some hints, shown below:

Key differences to conventional sockets

Generally speaking, conventional sockets present a synchronous interface to either connection-oriented reliable byte streams (SOCK_STREAM), or connection-less unreliable datagrams (SOCK_DGRAM). In comparison, ØMQ sockets present an abstraction of an asynchronous message queue, with the exact queueing semantics depending on the socket type in use. Where conventional sockets transfer streams of bytes or discrete datagrams, ØMQ sockets transfer discrete messages.

ØMQ sockets being asynchronous means that the timings of the physical connection setup and tear down, reconnect and effective delivery are transparent to the user and organized by ØMQ itself. Further, messages may be queued in the event that a peer is unavailable to receive them.

Type answered 4/5, 2012 at 9:48 Comment(0)
R
1

They setting HWM option on the socket.

Roswell answered 21/1, 2012 at 23:28 Comment(0)
P
0

So bind() and connect() result in two different behaviors. Why don't you just choose which one you prefer (it seems like bind()) and use that?

It is indeed a feature of ZeroMQ in general that it buffers outgoing messages until a connection is made.

Pentecostal answered 21/1, 2012 at 16:41 Comment(3)
Because I have multiple nodes which want to publish data to one well known server. Of course I can bind on the PUB side, but as result, I need N address for each node, the server don't know how many nodes there would be. I think bind and connect should not affect the behavior, once connection is made, there is no difference between bind and connect, then why make difference? I don't understand :SGyrostatic
Oh OK. Well I think ZeroMQ is behaving as expected and as designed, so you may just have to query the connection before sending data.Pentecostal
@JohnZwinck Choosing bind() vs connect() is not based on preference, but should instead be based on how it is used. He is using it correctly with bind() on the server (the publisher) and connect() on the client (the subscriber). And it does not always buffer outgoing messages, but instead it is determined by the socket type and the value of the high water mark as explained here with references to the documentation.Katsuyama
H
0

You should be able to set a high water mark in the socket using the hwm settingom the pub socket. It lets you define how many messages are kept.

Hillyer answered 21/1, 2012 at 20:45 Comment(0)
B
0

Here's a hack that might help...

Set your ZMQ::HWM to a fixed number, say 10. Upon connection, call the subscriber socket's recv method in a loop until it discards all the buffered messages, and only THEN start your main receiving loop.

Beatup answered 22/9, 2013 at 1:6 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.