High-performance replacement for multiprocessing.Queue
Asked Answered
C

3

8

My distributed application consists of many producers that push tasks into several FIFO queues, and multiple consumers for every one of these queues. All these components live on a single node, so no networking involved.

This pattern is perfectly supported by Python's built-in multiprocessing.Queue, however when I am scaling up my application the queue implementation seems to be a bottleneck. I am not sending large amounts of data, so memory sharing does not solve the problem. What I need is fast guaranteed delivery of 10^4-10^5 small messages per second. Each message is about 100 bytes.

I am new to the world of fast distributed computing and I am very confused by the sheer amount of options. There is RabbitMQ, Redis, Kafka, etc.

ZeroMQ is a more focused and compact alternative, which also has successors such as nanomsg and nng. Also, implementing something like a many-to-many queue with a guaranteed delivery seems nontrivial without a broker.

I would really appreciate if someone could point me to a "standard" way of doing something like this with one of the faster frameworks.

Compensatory answered 12/2, 2020 at 21:49 Comment(1)
10^5 messages per second on a single queue is a harsh requirement: the system has only 10 micro-seconds to fully process a message and be ready for the next one. As your question mentions it, you'b better look after distributed computing in order to share the load among multiple node, and achieve scalability by adding nodes. But this supposes your design not to set up one single queue that would be the bottleneck...Froward
C
10

After trying a few available implementations and frameworks, I still could not find anything that would be suitable for my task. Either too slow or too heavy.

To solve the issue my colleagues and I developed this: https://github.com/alex-petrenko/faster-fifo

faster-fifo is a drop-in replacement for Python's multiprocessing.Queue and is significantly faster. In fact, it is up to 30x faster in the configurations I cared about (many producers, few consumers) because it additionally supports get_many() method on the consumer side.

It is brokereless, lightweight, supports arbitrary many-to-many configurations, implemented for Posix systems using pthread synchronization primitives.

Compensatory answered 1/6, 2020 at 11:54 Comment(9)
Hi Aleksei, I have tried the faster-fifo to replace the pytorch multiprocessing queue. It doesn't seem to work. Any ideas how I will debug this?Clangor
@Paul, faster-fifo is not a replacement for a pytorch Queue class, it's only a replacement for the standard python's multiprocessing.Queue. But as far as I know the only difference is in the object pickling. Torch defines it's own pickler for Tensor objects. I believe you can use their pickler with faster-fifo with minimal modificationsCompensatory
I have changed to multiprocessing queue instead of torch.multiprocessing, and the application still works fine.Clangor
What's your problem? What kind of errors are you getting? If you are passing the PyTorch tensors around, they require a custom serializer, and I believe PyTorch registers it somehow for the standard queue.Compensatory
File "/usr/lib/python3.6/multiprocessing/popen_spawn_posix.py", line 47, in _launch reduction.dump(process_obj, fp)File "/usr/lib/python3.6/multiprocessing/reduction.py", line 60, in dump ForkingPickler(file, protocol).dump(obj) _pickle.PicklingError: Can't pickle <class 'main.c_ubyte_Array_2'>: attribute lookup c_ubyte_Array_2 on main failed`Clangor
I stick with multiprocessing queue for now as I getting the performance I need for inference. I was hoping faster-fifo will be drop in replacement without more changes to my code. I still have other fine tuning I can do with my code, like using gpujpeg/libjpeg-turbo.Clangor
Hi @AlekseiPetrenko, does faster-fifo use atomic get-put? How likely is it to deadlock? I read Python's MP Queue has some deadlock issues so I wonder if you addressed it as well. Thanks!Osculation
Hi @user9784065! This queue is not supposed to deadlock. Producers and consumers are synchronized with proper POSIX synchronization primitives.Compensatory
I tried the faster fifo in another project, and the latest version works really well and faster than multiprocessing queue.Clangor
U
4

I think that a lot of it depends partly on what sort of importance you place on individual messages.

If each and every one is vital, and you have to consider what happens to them in the event of some failure somewhere, then frameworks like RabbitMQ can be useful. RabbitMQ has a broker, and it's possible to configure this for some sort of high availability, high reliability mode. With the right queue settings, RabbitMQ will look after your messages up until some part of your system consumes them.

To do all this, RabbitMQ needs a broker. This makes it fairly slow. Though at one point there was talk about reimplementing RabbitMQ on top of ZeroMQ's underlying protocols (zmtp) and doing away with the broker, implementing all the functionality in the endpoints instead.

In contrast, ZeroMQ does far less to guarantee that, in the event of failures, your messages will actually, eventually, get through to the intended destination. If a process dies, or a network connection fails, then there's a high chance that messages have got lost. More recent versions can be set up to actively monitor connections, so that if a network cable breaks or a process dies somewhere, the endpoints at the other end of the sockets can be informed about this pretty quickly. If one then implements a communicating sequential processes framework on top of ZMQ's actor framework (think: message acknowledgements, etc. This will slow it down) you can end up with a system whereby endpoints can know for sure that messages have been transfered to intended destinations.

Being brokerless allows zmq to be pretty fast. And it's efficient across a number of different transports, ranging from inproc to tcp, all of which can be blended together. If you're not worried about processes crashing or network connections failing, ZMQ gives you a guarantee to deliver messages right out of the box.

So, deciding what it is that's important in your application helps choose what technology you're doing to use as part of it - RabbitMQ, ZeroMQ, etc. Once you've decided that, then the problem of "how to get the patterns I need" is reduced to "what patterns does that technology support". RabbitMQ is, AFAIK, purely pub/sub (there can be a lot of each), whereas ZeroMQ has many more.

Unmeriting answered 16/2, 2020 at 10:26 Comment(0)
P
3

I have tried Redis Server queuing in order to replace Python standard multiprocessing Queue. It is s NO GO for Redis ! Python is best, fastest and can accept any kind of data type you throw at it, where with Redis and complex datatype such as dict with lot of numpy array etc... you have to pickle or json dumps/loads which add up overhead to the process.

Cheers, Steve

Perreault answered 9/6, 2021 at 5:43 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.