ZeroMQ pattern for load balancing work across workers based on idleness
Asked Answered
G

2

6

I have a single producer and n workers that I only want to give work to when they're not already processing a unit of work and I'm struggling to find a good zeroMQ pattern.

1) REQ/REP

The producer is the requestor and creates a connection to each worker. It tracks which worker is busy and round-robins to idle workers

Problem:

  • How to be notified of responses and still able to send new work to idle workers without dedicating a thread in the producer to each worker?

2) PUSH/PULL

Producer pushes into one socket that all workers feed off, and workers push into another socket that the producer listens to.

Problem:

  • Has no concept of worker idleness, i.e. work gets stuck behind long units of work

3) PUB/SUB

Non-starter, since there is no way to make sure work doesn't get lost

4) Reverse REQ/REP

Each worker is the REQ end and requests work from the producer and then sends another request when it completes the work

Problem:

  • Producer has to block on a request for work until there is work (since each recv has to be paired with a send ). This prevents workers to respond with work completion
  • Could be fixed with a separate completion channel, but the producer still needs some polling mechanism to detect new work and stay on the same thread.

5) PAIR per worker

Each worker has its own PAIR connection allowing independent sending of work and receipt of results

Problem:

  • Same problem as REQ/REP with requiring a thread per worker

As much as zeroMQ is non-blocking/async under the hood, I cannot find a pattern that allows my code to be asynchronous as well, rather than blocking in many many dedicated threads or polling spin-loops in fewer. Is this just not a good use case for zeroMQ?

Goldfilled answered 14/6, 2015 at 0:12 Comment(0)
W
8

Your problem is solved with the Load Balancing Pattern in the ZMQ Guide. It's all about flow control whilst also being able to send and receive messages. The producer will only send work requests to idle workers, whilst the workers are able to send and receive other messages at all times, e.g. abort, shutdown, etc.

Whacky answered 14/6, 2015 at 22:17 Comment(2)
Amazing how much reading of that guide you have to do before the let it slip that the ROUTER and DEALER finally let you determine what client is sending a message and how to send a message to a particular client. Kind of burying the lead, that really should have been the headline!Goldfilled
The more you read of the guide, the better off you are, at least through chapter 6 or so. I highly recommend going through it pretty thoroughly until he starts talking about OSS project governance if you're going to be implementing a communication protocol in ZMQ, whether it's one of the standard ones in the guide (so you know you're picking the correct one for your scenario) or you're going to be developing your own. At any rate, the guide builds up slowly to ensure a strong foundation, it's very easy to get your pants around your ankles if you try and rush it.Key
C
7

Push/Pull is your answer.

When you send a message in ZeroMQ, all that happens initially is that it sits in a queue waiting to be delivered to the destination(s). When it has been successfully transferred it is removed from the queue. The queue is limited in length, but can be set by changing a socket's high water mark.

There is a/some background thread(s) that manage all this on your behalf, and your calls to the ZeroMQ API are simply issuing instructions to that/those threads. The threads at either end of a socket connection are collaborating to marshall the transfer of messages, i.e. a sender won't send a message unless the recipient can receive it.

Consider what this means in a push/pull set up. Suppose one of your pull workers is falling behind. It won't then be accepting messages. That means that messages being sent to it start piling up until the highwater mark is reached. ZeroMQ will no longer send messages to that pull worker. In fact AFAIK in ZeroMQ, a pull worker whose queue is more full than those of its peers will receive less messages, so the workload is evened out across all workers.

So What Does That Mean?

Just send the messages. Let 0MQ sort it out for you.

Whilst there's no explicit flag saying 'already busy', if messages can be sent at all then that means that some pull worker somewhere is able to receive it solely because it has kept up with the workload. It will therefore be best placed to process new messages.

There are limitations. If all the workers are full up then no messages are sent and you get blocked in the push when it tries to send another message. You can discover this only (it seems) by timing how long the zmq_send() took.

Don't Forget the Network

There's also the matter of network bandwidth to consider. Messages queued in the push will tranfer at the rate at which they're consumed by the recipients, or at the speed of the network (whichever is slower). If your network is fundamentally too slow, then it's the Wrong Network for the job.

Latency

Of course, messages piling up in buffers represents latency. This can be restricted by setting the high water mark to be quite low.

This won't cure a high latency problem, but it will allow you to find out that you have one. If you have an inadequate number of pull workers, a low high water mark will result in message sending failing/blocking sooner.

Actually I think in ZeroMQ it blocks for push/pull; you'd have to measure elapsed time in the call to zmq_send() to discover whether things had got bottled up.

Thought about Nanomsg?

Nanomsg is a reboot of ZeroMQ, one of the same guys is involved. There's many things I prefer about it, and ultimately I think it will replace ZeroMQ. It has some fancier patterns which are more universally usable (PAIR works on all transports, unlike in ZeroMQ). Also the patterns are essentially a plugable component in the source code, so it is far simpler for patterns to be developed and integrated than in ZeroMQ. There is a discussion on the differences here

Philisophical Discussion

Actor Model

ZeroMQ is definitely in the realms of Actor Model programming. Messages get stuffed into queues / channels / sockets, and at some undetermined point in time later they emerge at the recipient end to be processed.

The danger of this type of architecture is that it is possible to have the potential for deadlock without knowing it.

Suppose you have a system where messages pass both ways down a chain of processes, say instructions in one way and results in the other. It is possible that one of the processes will be trying to send a message whilst the recipient is actually also trying to send a message back to it.

That only works so long as the queues aren't full and can (temporarily) absorb the messages, allowing everyone to move on.

But suppose the network briefly became a little busy for some reason, and that delayed message transfer. The message send might then fail because the high water mark had been reached. Whoops! No one is then sending anything to anyone anymore!

CSP

A development of the Actor Model, called Communicating Sequential Processes, was invented to solve this problem. It has a restriction; there is no buffering of messages at all. No process can complete sending a message until the recipient has received all the data.

The theoretical consequence of this was that it was then possible to mathematically analyse a system design and pronounce it to be free of deadlock. The practical consequence is that if you've built a system that can deadlock, it will do so every time. That's actually not so bad; it'll show up in testing, not post-deployment.

Curiously this is hinted at in the documentation of Microsoft's Task Parallel library, where they advocate setting buffer lengths to zero in the intersts of achieving a more robust application.

It'd be like setting the ZeroMQ high water mark to zero, but in zmq_setsockopt() 0 means default, not nought. The default is non-zero...

CSP is much more suited to real time applications. Any shortage of available workers immediately results in an inability to send messages (so your system knows it's failed to keep up with the real time demand) instead of resulting in an increased latency as data is absorbed by sockets, etc. (which is far harder to discover).

Unfortunately almost every communications technology we have (Ethernet, TCP/IP, ZeroMQ, nanomsg, etc) leans towards Actor Model. Everything has some sort of buffer somewhere, be it a packet buffer on a NIC or a socket buffer in an operating system.

Thus to implement CSP in the real world one has to implement flow control on top of the existing transports. This takes work, and it's slightly inefficient. But if a system that needs it, it's definitely the way to go.

Personally I'd love to see 0MQ and Nanomsg to adopt it as a behavioural option.

Childbirth answered 14/6, 2015 at 6:26 Comment(5)
Is there a way of setting that high water mark to 1? From my tests it seems to be 16000 per worker, so 15999 work items can get stuck behind one slow unit of work. Push/Pull seems ideal for lots of evenly sized work, but I have work that ranges from a couple of milliseconds to a couple of minutesGoldfilled
A little more info about the use case. The producer is an actor in an Akka cluster (scala) sending work needing to pre-spawned python consumers. The producer and consumers are always local to each other. Trying to find the most efficient IPC (preferably non-blocking)Goldfilled
Certainly in the C binding you can set socket options for the high water mark, and it can be set to 1 message, See api.zeromq.org/4-0:zmq-setsockopt.Childbirth
ZeroMQ has a zero copy mode, which will speed things up nicely. See zguide.zeromq.org/php:chapter2. I don't know if it's available in SCALA and Python.Childbirth
Ok, works in Python and Scala as well. Unfortunately 0 means unlimited and 1 means at most one will get stuckGoldfilled

© 2022 - 2024 — McMap. All rights reserved.