I could use some pseudo-code, or better, Python. I am trying to implement a rate-limiting queue for a Python IRC bot, and it partially works, but if someone triggers less messages than the limit (e.g., rate limit is 5 messages per 8 seconds, and the person triggers only 4), and the next trigger is over the 8 seconds (e.g., 16 seconds later), the bot sends the message, but the queue becomes full and the bot waits 8 seconds, even though it's not needed since the 8 second period has lapsed.
Here the simplest algorithm, if you want just to drop messages when they arrive too quickly (instead of queuing them, which makes sense because the queue might get arbitrarily large):
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
discard_message();
else:
forward_message();
allowance -= 1.0;
There are no datastructures, timers etc. in this solution and it works cleanly :) To see this, 'allowance' grows at speed 5/8 units per seconds at most, i.e. at most five units per eight seconds. Every message that is forwarded deducts one unit, so you can't send more than five messages per every eight seconds.
Note that rate
should be an integer, i.e. without non-zero decimal part, or the algorithm won't work correctly (actual rate will not be rate/per
). E.g. rate=0.5; per=1.0;
does not work because allowance
will never grow to 1.0. But rate=1.0; per=2.0;
works fine.
allowance
. The bucket size is rate
. The allowance += …
line is an optimization of adding a token every rate ÷ per seconds. –
Shaina allowance
is less than 1, but allowance
inc at rate of 5/8 units per sec. So, by this standard, this algorithm successfully did its job. –
Thuthucydides Use this decorator @RateLimited(ratepersec) before your function that enqueues.
Basically, this checks if 1/rate secs have passed since the last time and if not, waits the remainder of the time, otherwise it doesn't wait. This effectively limits you to rate/sec. The decorator can be applied to any function you want rate-limited.
In your case, if you want a maximum of 5 messages per 8 seconds, use @RateLimited(0.625) before your sendToQueue function.
import time
def RateLimited(maxPerSecond):
minInterval = 1.0 / float(maxPerSecond)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(*args,**kargs):
elapsed = time.clock() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
ret = func(*args,**kargs)
lastTimeCalled[0] = time.clock()
return ret
return rateLimitedFunction
return decorate
@RateLimited(2) # 2 per second at most
def PrintNumber(num):
print num
if __name__ == "__main__":
print "This should print 1,2,3... at about 2 per second."
for i in range(1,100):
PrintNumber(i)
time.clock()
doesn't have enough resolution on my system, so I adapted the code and changed to use time.time()
–
Cheke time.clock()
, which measures elapsed CPU time. CPU time can run much faster or much slower than "actual" time. You want to use time.time()
instead, which measures wall time ("actual" time). –
Chromonema A Token Bucket is fairly simple to implement.
Start with a bucket with 5 tokens.
Every 5/8 seconds: If the bucket has less than 5 tokens, add one.
Each time you want to send a message: If the bucket has ≥1 token, take one token out and send the message. Otherwise, wait/drop the message/whatever.
(obviously, in actual code, you'd use an integer counter instead of real tokens and you can optimize out the every 5/8s step by storing timestamps)
Reading the question again, if the rate limit is fully reset each 8 seconds, then here is a modification:
Start with a timestamp, last_send
, at a time long ago (e.g., at the epoch). Also, start with the same 5-token bucket.
Strike the every 5/8 seconds rule.
Each time you send a message: First, check if last_send
≥ 8 seconds ago. If so, fill the bucket (set it to 5 tokens). Second, if there are tokens in the bucket, send the message (otherwise, drop/wait/etc.). Third, set last_send
to now.
That should work for that scenario.
I've actually written an IRC bot using a strategy like this (the first approach). Its in Perl, not Python, but here is some code to illustrate:
The first part here handles adding tokens to the bucket. You can see the optimization of adding tokens based on time (2nd to last line) and then the last line clamps bucket contents to the maximum (MESSAGE_BURST)
my $start_time = time;
...
# Bucket handling
my $bucket = $conn->{fujiko_limit_bucket};
my $lasttx = $conn->{fujiko_limit_lasttx};
$bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;
$conn is a data structure which is passed around. This is inside a method that runs routinely (it calculates when the next time it'll have something to do, and sleeps either that long or until it gets network traffic). The next part of the method handles sending. It is rather complicated, because messages have priorities associated with them.
# Queue handling. Start with the ultimate queue.
my $queues = $conn->{fujiko_queues};
foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
# Ultimate is special. We run ultimate no matter what. Even if
# it sends the bucket negative.
--$bucket;
$entry->{code}(@{$entry->{args}});
}
$queues->[PRIORITY_ULTIMATE] = [];
That's the first queue, which is run no matter what. Even if it gets our connection killed for flooding. Used for extremely important things, like responding to the server's PING. Next, the rest of the queues:
# Continue to the other queues, in order of priority.
QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
my $queue = $queues->[$pri];
while (scalar(@$queue)) {
if ($bucket < 1) {
# continue later.
$need_more_time = 1;
last QRUN;
} else {
--$bucket;
my $entry = shift @$queue;
$entry->{code}(@{$entry->{args}});
}
}
}
Finally, the bucket status is saved back to the $conn data structure (actually a bit later in the method; it first calculates how soon it'll have more work)
# Save status.
$conn->{fujiko_limit_bucket} = $bucket;
$conn->{fujiko_limit_lasttx} = $start_time;
As you can see, the actual bucket handling code is very small — about four lines. The rest of the code is priority queue handling. The bot has priority queues so that e.g., someone chatting with it can't prevent it from doing its important kick/ban duties.
to block processing until the message can be sent, thus queuing up further messages, antti's beautiful solution may also be modified like this:
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
time.sleep( (1-allowance) * (per/rate))
forward_message();
allowance = 0.0;
else:
forward_message();
allowance -= 1.0;
it just waits until enough allowance is there to send the message. to not start with two times the rate, allowance may also initialized with 0.
(1-allowance) * (per/rate)
, you need to add that same amount to last_check
. –
Varix One solution is to attach a timestamp to each queue item and to discard the item after 8 seconds have passed. You can perform this check each time the queue is added to.
This only works if you limit the queue size to 5 and discard any additions whilst the queue is full.
Keep the time that the last five lines were sent. Hold the queued messages until the time the fifth-most-recent message (if it exists) is a least 8 seconds in the past (with last_five as an array of times):
now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
last_five.insert(0, now)
send_message(msg)
if len(last_five) > 5:
last_five.pop()
If someone still interested, I use this simple callable class in conjunction with a timed LRU key value storage to limit request rate per IP. Uses a deque, but can rewritten to be used with a list instead.
from collections import deque
import time
class RateLimiter:
def __init__(self, maxRate=5, timeUnit=1):
self.timeUnit = timeUnit
self.deque = deque(maxlen=maxRate)
def __call__(self):
if self.deque.maxlen == len(self.deque):
cTime = time.time()
if cTime - self.deque[0] > self.timeUnit:
self.deque.append(cTime)
return False
else:
return True
self.deque.append(time.time())
return False
r = RateLimiter()
for i in range(0,100):
time.sleep(0.1)
print(i, "block" if r() else "pass")
Just a python implementation of a code from accepted answer.
import time
class Object(object):
pass
def get_throttler(rate, per):
scope = Object()
scope.allowance = rate
scope.last_check = time.time()
def throttler(fn):
current = time.time()
time_passed = current - scope.last_check;
scope.last_check = current;
scope.allowance = scope.allowance + time_passed * (rate / per)
if (scope.allowance > rate):
scope.allowance = rate
if (scope.allowance < 1):
pass
else:
fn()
scope.allowance = scope.allowance - 1
return throttler
How about this:
long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;
private boolean isRateLimited(int msgs_per_sec) {
if (System.currentTimeMillis() - check_time > 1000) {
check_time = System.currentTimeMillis();
msgs_sent_count = 0;
}
if (msgs_sent_count > (msgs_per_sec - 1)) {
return true;
} else {
msgs_sent_count++;
}
return false;
}
I needed a variation in Scala. Here it is:
case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {
import Thread.sleep
private def now = System.currentTimeMillis / 1000.0
private val (calls, sec) = callsPerSecond
private var allowance = 1.0
private var last = now
def apply(a: A): B = {
synchronized {
val t = now
val delta_t = t - last
last = t
allowance += delta_t * (calls / sec)
if (allowance > calls)
allowance = calls
if (allowance < 1d) {
sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
}
allowance -= 1
}
f(a)
}
}
Here is how it can be used:
val f = Limiter((5d, 8d), {
_: Unit ⇒
println(System.currentTimeMillis)
})
while(true){f(())}
yet another solution
from collections import deque
from datetime import timedelta
from time import sleep
class RateLimiter:
def __init__(self, items: int, per: timedelta = timedelta(seconds=1)):
self.items = items
self.per = per
self.deque = deque(maxlen=items)
def count(self):
now = datetime.now()
self.deque.append(now)
def time_to_wait(self) -> timedelta:
if len(self.deque) < self.deque.maxlen:
return timedelta(0)
now = datetime.now()
per = now - self.deque[0]
return max(timedelta(0), self.per - per)
def throttle(self):
sleep(self.time_to_wait().total_seconds())
self.count()
if __name__ == '__main__':
rate_limiter = RateLimiter(items=3, per=timedelta(seconds=3))
for i in range(10):
rate_limiter.throttle()
print(f'{i}')
java syntax, prime idea: don't count iterations, count leap time. Remember last leap time, wait for the time needed not to exceed the rate for the leap
public static void limitRate(int rate, AtomicLong leapTime, ReentrantLock rateLock) {
long targetLeapTime = 1_000_000_000 / rate;
rateLock.lock();
try {
long timeSnapshot = nanoTime();
long waitTime = targetLeapTime - (timeSnapshot - leapTime.get());
if (waitTime > 0) {
LockSupport.parkNanos(waitTime);
leapTime.set(timeSnapshot + waitTime);
} else {
leapTime.set(timeSnapshot);
}
} finally {
rateLock.unlock();
}
}
© 2022 - 2024 — McMap. All rights reserved.