What's a good rate limiting algorithm?
Asked Answered
O

12

179

I could use some pseudo-code, or better, Python. I am trying to implement a rate-limiting queue for a Python IRC bot, and it partially works, but if someone triggers less messages than the limit (e.g., rate limit is 5 messages per 8 seconds, and the person triggers only 4), and the next trigger is over the 8 seconds (e.g., 16 seconds later), the bot sends the message, but the queue becomes full and the bot waits 8 seconds, even though it's not needed since the 8 second period has lapsed.

Overcritical answered 20/3, 2009 at 19:2 Comment(0)
C
268

Here the simplest algorithm, if you want just to drop messages when they arrive too quickly (instead of queuing them, which makes sense because the queue might get arbitrarily large):

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

There are no datastructures, timers etc. in this solution and it works cleanly :) To see this, 'allowance' grows at speed 5/8 units per seconds at most, i.e. at most five units per eight seconds. Every message that is forwarded deducts one unit, so you can't send more than five messages per every eight seconds.

Note that rate should be an integer, i.e. without non-zero decimal part, or the algorithm won't work correctly (actual rate will not be rate/per). E.g. rate=0.5; per=1.0; does not work because allowance will never grow to 1.0. But rate=1.0; per=2.0; works fine.

Consume answered 20/3, 2009 at 23:15 Comment(12)
It's also worth pointing out that the dimension and scale of 'time_passed' must be the same as 'per', e.g. seconds.Greening
That is a standard algorithm—it's a token bucket, without queue. The bucket is allowance. The bucket size is rate. The allowance += … line is an optimization of adding a token every rate ÷ per seconds.Shaina
This allows large bursts of messages after some idle time. In this case after 30min of idling even a burst of 1000messages at once would be accepted.Ignoramus
@Ignoramus What you write above is not true. 'Allowance' is always capped by 'rate' (look at the "// throttle" line) so it will only allow a burst of exactly 'rate' messages at any particular time, i.e. 5.Consume
This is good, but can exceed the rate. Let's say at time 0 you forward 5 messages, then at time N * (8/5) for N = 1, 2, ... you can send another message, resulting in more than 5 messages in an 8 second periodPittman
@mdkess Your observation is correct - if first no messages are sent, allowance grows to 5 (also at the beginning), then you can send all 5 in a burst, and after 8/5 seconds you are allowed to send a new one. I wouldn't say it exceeds the set rate in any practical manner, though, at least not in the context of the original question, because the right to send the burst of 5 messages has to be earned by underutilizing the allowed rate right before.Consume
@Pittman I think we can see it this way: let the end time of first 5 msgs be the end time of first 8 sec period, so that after we received first 5 msgs, we started a brand new 8 sec period, and in this period, at first, we can't handle any msgs because allowance is less than 1, but allowance inc at rate of 5/8 units per sec. So, by this standard, this algorithm successfully did its job.Thuthucydides
@AnttiHuima, can't quite get why rate can't be a fractional decimal. I would say it cannot be < 1.0. Otherwise, (e.g. 2.5) should work ok.Prognosticate
I don't understand how this can work. I tested with a list that contains 100 Elements. Looping over it and set time_passed fix to 0.1, rate=5, per=1. This leads to 9 processed elements, 10th element discarded, 11th processed, 12th discarded and so on.Pannonia
This is the classic "leaky bucket" algorithm it can't be improved much. @Greening it's possible that he rediscovered it and that this is all his own work, I did.Exostosis
@Pittman you can consider first timeframe as the timeframe which is ending at 0, afterwards algorithm is considering rate/per message only.Thorley
As @Pittman correctly observed, the Token Bucket algo doesn't completely protect against bursts. If an accurate solution is needed for the cost of increased complexity and reduced efficiency (both in space and time), a Sliding Window would be the approach of choice.Cade
P
52

Use this decorator @RateLimited(ratepersec) before your function that enqueues.

Basically, this checks if 1/rate secs have passed since the last time and if not, waits the remainder of the time, otherwise it doesn't wait. This effectively limits you to rate/sec. The decorator can be applied to any function you want rate-limited.

In your case, if you want a maximum of 5 messages per 8 seconds, use @RateLimited(0.625) before your sendToQueue function.

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)
Probity answered 20/3, 2009 at 19:51 Comment(8)
I like the idea of using a decorator for this purpose. Why do is lastTimeCalled a list? Also, I doubt this'll work when multiple threads are calling the same RateLimited function...Ameba
It's a list because simple types like float are constant when captured by a closure. By making it a list, the list is constant, but its contents are not. Yes, it's not thread-safe but that can be easily fixed with locks.Probity
time.clock() doesn't have enough resolution on my system, so I adapted the code and changed to use time.time()Cheke
For rate limiting, you definitely do not want to use time.clock(), which measures elapsed CPU time. CPU time can run much faster or much slower than "actual" time. You want to use time.time() instead, which measures wall time ("actual" time).Chromonema
BTW for real production systems: implementing a rate limiting with a sleep() call might not be a good idea as it is going to block the thread and therefore preventing another client from using it.Goosefish
Production web system yes. But if it's running as a job it's probably OK as long as you're not blocking the database (commit!) and it's not running out of workers.Kennet
When updating lastTimeCalled, it should be protected by a Lock, right?Melisamelisande
If I have a limit of 1000 requests per 1000 milliseconds, and I have 10 requests incoming, this version will take 10ms to execute all the requests. the token bucket version will allow the 10 requests all begin at the first millisecond.Battery
S
30

A Token Bucket is fairly simple to implement.

Start with a bucket with 5 tokens.

Every 5/8 seconds: If the bucket has less than 5 tokens, add one.

Each time you want to send a message: If the bucket has ≥1 token, take one token out and send the message. Otherwise, wait/drop the message/whatever.

(obviously, in actual code, you'd use an integer counter instead of real tokens and you can optimize out the every 5/8s step by storing timestamps)


Reading the question again, if the rate limit is fully reset each 8 seconds, then here is a modification:

Start with a timestamp, last_send, at a time long ago (e.g., at the epoch). Also, start with the same 5-token bucket.

Strike the every 5/8 seconds rule.

Each time you send a message: First, check if last_send ≥ 8 seconds ago. If so, fill the bucket (set it to 5 tokens). Second, if there are tokens in the bucket, send the message (otherwise, drop/wait/etc.). Third, set last_send to now.

That should work for that scenario.


I've actually written an IRC bot using a strategy like this (the first approach). Its in Perl, not Python, but here is some code to illustrate:

The first part here handles adding tokens to the bucket. You can see the optimization of adding tokens based on time (2nd to last line) and then the last line clamps bucket contents to the maximum (MESSAGE_BURST)

    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$conn is a data structure which is passed around. This is inside a method that runs routinely (it calculates when the next time it'll have something to do, and sleeps either that long or until it gets network traffic). The next part of the method handles sending. It is rather complicated, because messages have priorities associated with them.

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

That's the first queue, which is run no matter what. Even if it gets our connection killed for flooding. Used for extremely important things, like responding to the server's PING. Next, the rest of the queues:

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

Finally, the bucket status is saved back to the $conn data structure (actually a bit later in the method; it first calculates how soon it'll have more work)

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

As you can see, the actual bucket handling code is very small — about four lines. The rest of the code is priority queue handling. The bot has priority queues so that e.g., someone chatting with it can't prevent it from doing its important kick/ban duties.

Shaina answered 20/3, 2009 at 19:4 Comment(3)
Am I missing something... it looks like this would limit you to 1 message every 8 seconds after you get through the first 5Ostium
@chills42: Yes, I read the question wrong... see the second half of the answer.Shaina
@chills: If last_send is <8 seconds, you don't add any tokens to the bucket. If your bucket contains tokens, you can send the message; otherwise you can't (you've already sent 5 messages in the last 8 secs)Shaina
B
12

to block processing until the message can be sent, thus queuing up further messages, antti's beautiful solution may also be modified like this:

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

it just waits until enough allowance is there to send the message. to not start with two times the rate, allowance may also initialized with 0.

Bush answered 20/6, 2011 at 17:39 Comment(1)
When you sleep (1-allowance) * (per/rate), you need to add that same amount to last_check.Varix
C
3

One solution is to attach a timestamp to each queue item and to discard the item after 8 seconds have passed. You can perform this check each time the queue is added to.

This only works if you limit the queue size to 5 and discard any additions whilst the queue is full.

Castellatus answered 20/3, 2009 at 19:7 Comment(0)
G
2

Keep the time that the last five lines were sent. Hold the queued messages until the time the fifth-most-recent message (if it exists) is a least 8 seconds in the past (with last_five as an array of times):

now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
    last_five.insert(0, now)
    send_message(msg)
if len(last_five) > 5:
    last_five.pop()
Grating answered 20/3, 2009 at 19:18 Comment(6)
You're storing five time stamps and repeatedly shifting them through memory (or doing linked list operations). I'm storing one integer counter and one timestamp. And only doing arithmetic and assign.Shaina
Except that mine will function better if trying to send 5 lines but only 3 more are allowed in the time period. Yours will allow sending the first three, and force a 8 second wait before sending 4 and 5. Mine will allow 4 and 5 to be sent 8 seconds after the fourth- and fifth-most-recent lines.Grating
But on the subject, performance could be improved through using a circular linked list of length 5, pointing to the fifth-most-recent send, overwriting it on new send, and moving the pointer forward one.Grating
for an irc bot with a rate limiter speed is not an issue. i prefer the list solution as it is more readable. the bucket answer thats been given is confusing because of the revision, but there is nothing wrong with it either.Castellatus
@Pesto: That's true, the burst-iness does differ. Easy enough to get either behavior from either approach. Which behavior is wanted depends on how the server implements its flood limiting.Shaina
Not that performance is an issue, but if there's a niggling theoretical worry about the O(n) time for insert(0, now) then use a collections.deque instead of a list and change "insert(0, " to "appendleft(".Fiend
Y
2

If someone still interested, I use this simple callable class in conjunction with a timed LRU key value storage to limit request rate per IP. Uses a deque, but can rewritten to be used with a list instead.

from collections import deque
import time


class RateLimiter:
    def __init__(self, maxRate=5, timeUnit=1):
        self.timeUnit = timeUnit
        self.deque = deque(maxlen=maxRate)

    def __call__(self):
        if self.deque.maxlen == len(self.deque):
            cTime = time.time()
            if cTime - self.deque[0] > self.timeUnit:
                self.deque.append(cTime)
                return False
            else:
                return True
        self.deque.append(time.time())
        return False

r = RateLimiter()
for i in range(0,100):
    time.sleep(0.1)
    print(i, "block" if r() else "pass")
Yusem answered 10/6, 2013 at 19:23 Comment(0)
W
1

Just a python implementation of a code from accepted answer.

import time

class Object(object):
    pass

def get_throttler(rate, per):
    scope = Object()
    scope.allowance = rate
    scope.last_check = time.time()
    def throttler(fn):
        current = time.time()
        time_passed = current - scope.last_check;
        scope.last_check = current;
        scope.allowance = scope.allowance + time_passed * (rate / per)
        if (scope.allowance > rate):
          scope.allowance = rate
        if (scope.allowance < 1):
          pass
        else:
          fn()
          scope.allowance = scope.allowance - 1
    return throttler
Woosley answered 20/10, 2016 at 9:52 Comment(1)
It has been suggested to me that I suggest you to add a usage example of your code.Morose
M
0

How about this:

long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;

private boolean isRateLimited(int msgs_per_sec) {
    if (System.currentTimeMillis() - check_time > 1000) {
        check_time = System.currentTimeMillis();
        msgs_sent_count = 0;
    }

    if (msgs_sent_count > (msgs_per_sec - 1)) {
        return true;
    } else {
        msgs_sent_count++;
    }

    return false;
}
Mohammadmohammed answered 1/6, 2009 at 22:58 Comment(0)
N
0

I needed a variation in Scala. Here it is:

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {

  import Thread.sleep
  private def now = System.currentTimeMillis / 1000.0
  private val (calls, sec) = callsPerSecond
  private var allowance  = 1.0
  private var last = now

  def apply(a: A): B = {
    synchronized {
      val t = now
      val delta_t = t - last
      last = t
      allowance += delta_t * (calls / sec)
      if (allowance > calls)
        allowance = calls
      if (allowance < 1d) {
        sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
      }
      allowance -= 1
    }
    f(a)
  }

}

Here is how it can be used:

val f = Limiter((5d, 8d), { 
  _: Unit ⇒ 
    println(System.currentTimeMillis) 
})
while(true){f(())}
Nominal answered 14/10, 2016 at 3:39 Comment(0)
F
0

yet another solution

from collections import deque
from datetime import timedelta
from time import sleep

class RateLimiter:
    def __init__(self, items: int, per: timedelta = timedelta(seconds=1)):
        self.items = items
        self.per = per
        self.deque = deque(maxlen=items)

    def count(self):
        now = datetime.now()
        self.deque.append(now)

    def time_to_wait(self) -> timedelta:
        if len(self.deque) < self.deque.maxlen:
            return timedelta(0)
        now = datetime.now()
        per = now - self.deque[0]
        return max(timedelta(0), self.per - per)

    def throttle(self):
        sleep(self.time_to_wait().total_seconds())
        self.count()

if __name__ == '__main__':
    rate_limiter = RateLimiter(items=3, per=timedelta(seconds=3))

    for i in range(10):
        rate_limiter.throttle()
        print(f'{i}')
Feuillant answered 9/1, 2023 at 12:55 Comment(0)
B
0

java syntax, prime idea: don't count iterations, count leap time. Remember last leap time, wait for the time needed not to exceed the rate for the leap

public static void limitRate(int rate, AtomicLong leapTime, ReentrantLock rateLock) {
    long targetLeapTime = 1_000_000_000 / rate;
    rateLock.lock();
    try {
        long timeSnapshot = nanoTime();
        long waitTime = targetLeapTime - (timeSnapshot - leapTime.get());
        if (waitTime > 0) {

            LockSupport.parkNanos(waitTime);

            leapTime.set(timeSnapshot + waitTime);
        } else {
            leapTime.set(timeSnapshot);
        }
    } finally {
        rateLock.unlock();
    }
}
Bilbrey answered 26/5, 2023 at 11:34 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.