Understanding ZMQ's HWM
Asked Answered
D

1

10

I'm having some trouble understanding how the ZeroMQ high-water mark (HWM) queues work.

I have made two scripts attached below, which reproduce the following.

  • Stablish a PUSH/PULL connection, Setting all HWM queues to size 1.
  • Make the puller sleep some time.
  • Send 2200 messages from the pusher.
  • When the puller awakes, receive the 2200 messages and print them.

The result I get is that the puller is able to receive (print) all messages successfully. Also, the pusher seems to finish execution almost instantly. According to the ZMQ official documentation what I would expect from that is the pusher to not finish execution before the puller awakes, because of being blocked on the second send(...) call due to the HWM being reached. I have also tried adding a 0.001 second sleep between each send(...) call, same result.

So, my questions are:

  • Why is pusher not blocking in the second call to send(...), after the HWM is reached (size 1)?
  • Where are the messages stored in both pusher and puller?
  • Is there a direct relation between the HWM size and the number of messages stored?

Scripts:

pusher.py

import zmq

context = zmq.Context()
push_socket = context.socket(zmq.PUSH)

push_socket.setsockopt(zmq.SNDHWM, 1)
push_socket.setsockopt(zmq.RCVHWM, 1)

push_socket.bind("tcp://127.0.0.1:5557")
print(push_socket.get_hwm()) # Prints 1
print('Sending all messages')

for i in range(2200):
    push_socket.send(str(i).encode('ascii'))

print('Finished execution...')

puller.py

import zmq
import time

context = zmq.Context()

pull_socket = context.socket(zmq.PULL)

pull_socket.setsockopt(zmq.RCVHWM, 1)
pull_socket.setsockopt(zmq.SNDHWM, 1)

pull_socket.connect("tcp://127.0.0.1:5557")
print(pull_socket.get_hwm()) # Prints 1

print('Connected, but not receiving yet... (Sleep 4s)')
time.sleep(4)
print('Receiving everything now!')

rec = ''

for i in range(2200):
    rec += '{} '.format(pull_socket.recv().decode('ascii'))

print(rec) # Prints `0 1 2 ... 2198 2199 `

In order to reproduce my test case, open two terminals and launch first puller.py in one and quickly afterwards (4 seconds window) pusher.py in the other one.

Denice answered 22/3, 2017 at 10:23 Comment(0)
H
15

There are at least 4 buffers involved here: zmq send buffer, OS write tcp buffer, OS read tcp buffer, and zmq recv buffer.

The zmq io threads mark a message as "sent" when it has successfully been written to the OS tcp write buffer. The messages are now considered "in transit".

Then the network stack takes care of transferring as much as it can into the other process' matching OS recv buffer, Finally, the receiving zmq io thread reads at most HWM messages at a time from this buffer into the ZMQ recv queue.

The OS buffers are by default usually around 10-100kb, and both of these can fill up completely with "in transit" messages before ZMQ even notices that the other side isn't consuming any messages. These buffers are kind of required for performance reasons - you can't just get rid of them.

The solution to your problem probably involves req/rep sockets and an explicit application-level ack i.e. lazy pirate pattern from the guide.

Hartmunn answered 22/3, 2017 at 10:53 Comment(2)
Thanks for the answer! I had no idea about the OS buffers... I will do some research about them.Denice
Wow! Thank you. How do you know all of this? Can you please share a book (Python)?Precincts

© 2022 - 2024 — McMap. All rights reserved.