RabbitMQ: how to limit consuming rate
Asked Answered
K

5

19

I need to limit the rate of consuming messages from rabbitmq queue.

I have found many suggestions, but most of them offer to use prefetch option. But this option doesn't do what I need. Even if I set prefetch to 1 the rate is about 6000 messages/sec. This is too many for consumer.

I need to limit for example about 70 to 200 messages per second. This means consuming one message every 5-14ms. No simultaneous messages.

I'm using Node.JS with amqp.node library.

Karyoplasm answered 24/3, 2015 at 6:54 Comment(3)
Look at this answer: https://mcmap.net/q/443937/-rabbitmq-how-to-throttle-the-consumerKeith
I'd assume that you are using prefetch count in combination with message acks, otherwise prefetch count is meaninglessRumple
Yep. I've already found a solution. I use module nanotimer from npm for calculation delays. Then I calculate delay = 1 / [message_per_second] in nanoseconds. Then I consume message with prefetch = 1 Then I calculate really delay as delay - [processing_message_time] Then I make timeout = really delay before sending ack for the message It works perfectly. Thank to all.Karyoplasm
K
10

I've already found a solution.

I use module nanotimer from npm for calculation delays.

Then I calculate delay = 1 / [message_per_second] in nanoseconds.

Then I consume message with prefetch = 1

Then I calculate really delay as delay - [processing_message_time]

Then I make timeout = really delay before sending ack for the message

It works perfectly. Thanks to all

Karyoplasm answered 2/4, 2015 at 14:26 Comment(1)
This will work fine as long as you only have one consumer: One of the big selling points to using RabbitMQ of course would be the ability to scale up to many consumers.Inextricable
I
13

Implementing a token bucket might help: https://en.wikipedia.org/wiki/Token_bucket

You can write a producer that produces to the "token bucket queue" at a fixed rate with a TTL on the message (maybe expires after a second?) or just set a maximum queue size equal to your rate per second. Consumers that receive a "normal queue" message must also receive a "token bucket queue" message in order to process the message effectively rate limiting the application.

NodeJS + amqplib Example:

var queueName = 'my_token_bucket';
rabbitChannel.assertQueue(queueName, {durable: true, messageTtl: 1000, maxLength: bucket.ratePerSecond});
writeToken();

function writeToken() {
    rabbitChannel.sendToQueue(queueName, new Buffer(new Date().toISOString()), {persistent: true});
    setTimeout(writeToken, 1000 / bucket.ratePerSecond);
}
Inextricable answered 25/7, 2016 at 19:31 Comment(7)
Great suggestion for a dedicated token bucket queue, thank you!Ectomere
No problem, it is working great for me right now with minimal work required!Inextricable
May I ask where are you producing token bucket messages? Of course implementing producing process is trivial, but if feels less then optimal both from performance and reliability point of view. I'm trying to find some way to generate messages at constant rate on RabbitMQ side (may be special exchange type), but I don't see anything readily available.Ectomere
@MichaelKorbakov writing your own on the TCP socket level would definitely be more efficient but lots more could go wrong. This works fine for me (in Node.js)Inextricable
This is an elegant solution to a vexing problem. Thank you for the suggestion!Tuberculin
Great idea @JohnCulviner, that helped me a lot. Can you also explain how you are consuming the tokens on your "workers"? I thought I would just always set prefetch to 5 for instance so that I have always 5 tokens in spare and once I acknowledge these I will get new ones. "Unfortunately" I constantly get new tokens once they are expired, so I assume you are simply checking if you can use this token once you receive it, instead of keeping it idling for the next second right?Mazurka
@Mazurka this was awhile ago but i don't think I had any issue with getting expired tokens due to the messageTtl and maxLength settings. if you have that issue maybe check for the message having expired when you get it though. This was in nodejs perhaps that could be a difference tooInextricable
K
10

I've already found a solution.

I use module nanotimer from npm for calculation delays.

Then I calculate delay = 1 / [message_per_second] in nanoseconds.

Then I consume message with prefetch = 1

Then I calculate really delay as delay - [processing_message_time]

Then I make timeout = really delay before sending ack for the message

It works perfectly. Thanks to all

Karyoplasm answered 2/4, 2015 at 14:26 Comment(1)
This will work fine as long as you only have one consumer: One of the big selling points to using RabbitMQ of course would be the ability to scale up to many consumers.Inextricable
C
4

See 'Fair Dispatch' in RabbitMQ Documentation.

For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn't know anything about that and will still dispatch messages evenly.

This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn't look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.

In order to defeat that we can use the prefetch method with the value of 1. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

Colorimeter answered 23/5, 2017 at 21:20 Comment(0)
R
1

I don't think RabbitMQ can provide you this feature out of the box.

If you have only one consumer, then the whole thing is pretty easy, you just let it sleep between consuming messages.

If you have multiple consumers I would recommend you to use some "shared memory" to keep the rate. For example, you might have 10 consumers consuming messages. To keep 70-200 messages rate across all of them, you will make a call to Redis, to see if you are eligible to process message. If yes, then update Redis, to show other consumers that currently one message is in process.

If you have no control over consumer, then implement option 1 or 2 and publish message back to Rabbit. This way the original consumer will consume messages with the desired pace.

Reef answered 24/3, 2015 at 13:5 Comment(0)
L
1

This is how I fixed mine with just settimeout

I set mine to process consume every 200mls which will consume 5 data in 1 seconds I did mine to do update if exist

channel.consume(transactionQueueName, async (data) => {
   let dataNew = JSON.parse(data.content);
       const processedTransaction = await seperateATransaction(dataNew);
        // delay ack to avoid duplicate entry !important dont remove the settimeout
        setTimeout(function(){
          channel.ack(data);
        },200);
 });

Done

Lalla answered 7/1, 2021 at 16:31 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.