I'm having some trouble understanding how the ZeroMQ high-water mark (HWM) queues work.
I have made two scripts attached below, which reproduce the following.
- Stablish a PUSH/PULL connection, Setting all HWM queues to size 1.
- Make the puller sleep some time.
- Send 2200 messages from the pusher.
- When the puller awakes, receive the 2200 messages and print them.
The result I get is that the puller is able to receive (print) all messages successfully. Also, the pusher seems to finish execution almost instantly. According to the ZMQ official documentation what I would expect from that is the pusher to not finish execution before the puller awakes, because of being blocked on the second send(...)
call due to the HWM being reached. I have also tried adding a 0.001 second sleep between each send(...)
call, same result.
So, my questions are:
- Why is pusher not blocking in the second call to
send(...)
, after the HWM is reached (size 1)? - Where are the messages stored in both pusher and puller?
- Is there a direct relation between the HWM size and the number of messages stored?
Scripts:
pusher.py
import zmq
context = zmq.Context()
push_socket = context.socket(zmq.PUSH)
push_socket.setsockopt(zmq.SNDHWM, 1)
push_socket.setsockopt(zmq.RCVHWM, 1)
push_socket.bind("tcp://127.0.0.1:5557")
print(push_socket.get_hwm()) # Prints 1
print('Sending all messages')
for i in range(2200):
push_socket.send(str(i).encode('ascii'))
print('Finished execution...')
puller.py
import zmq
import time
context = zmq.Context()
pull_socket = context.socket(zmq.PULL)
pull_socket.setsockopt(zmq.RCVHWM, 1)
pull_socket.setsockopt(zmq.SNDHWM, 1)
pull_socket.connect("tcp://127.0.0.1:5557")
print(pull_socket.get_hwm()) # Prints 1
print('Connected, but not receiving yet... (Sleep 4s)')
time.sleep(4)
print('Receiving everything now!')
rec = ''
for i in range(2200):
rec += '{} '.format(pull_socket.recv().decode('ascii'))
print(rec) # Prints `0 1 2 ... 2198 2199 `
In order to reproduce my test case, open two terminals and launch first puller.py in one and quickly afterwards (4 seconds window) pusher.py in the other one.