Python API Rate Limiting - How to Limit API Calls Globally
Asked Answered
T

7

22

I'm trying to restrict the API calls in my code. I already found a nice python library ratelimiter==1.0.2.post0 https://pypi.python.org/pypi/ratelimiter

However, this library can only limit the rate in local scope. i.e) in function and loops

# Decorator
@RateLimiter(max_calls=10, period=1)
def do_something():
    pass


# Context Manager
rate_limiter = RateLimiter(max_calls=10, period=1)

for i in range(100):
    with rate_limiter:
        do_something()

Because I have several functions, which make API calls, in different places, I want to limit the API calls in global scope.

For example, suppose I want to limit the APIs call to one time per second. And, suppose I have functions x and y in which two API calls are made.

@rate(...)
def x():
   ...

@rate(...)
def y():
   ...

By decorating the functions with the limiter, I'm able to limit the rate against the two functions.

However, if I execute the above two functions sequentially, it looses track of the number of API calls in global scope because they are unaware of each other. So, y will be called right after the execution of x without waiting another second. And, this will violate the one time per second restriction.

Is there any way or library that I can use to limit the rate globally in python?

Thistly answered 22/11, 2016 at 18:3 Comment(0)
T
10

After all, I implemented my own Throttler class. By proxying every API request to the request method, we can keep track of all API requests. Taking advantage of passing function as the request method parameter, it also caches the result in order to reduce API calls.

class TooManyRequestsError(Exception):
    def __str__(self):
        return "More than 30 requests have been made in the last five seconds."


class Throttler(object):
    cache = {}

    def __init__(self, max_rate, window, throttle_stop=False, cache_age=1800):
        # Dict of max number of requests of the API rate limit for each source
        self.max_rate = max_rate
        # Dict of duration of the API rate limit for each source
        self.window = window
        # Whether to throw an error (when True) if the limit is reached, or wait until another request
        self.throttle_stop = throttle_stop
        # The time, in seconds, for which to cache a response
        self.cache_age = cache_age
        # Initialization
        self.next_reset_at = dict()
        self.num_requests = dict()

        now = datetime.datetime.now()
        for source in self.max_rate:
            self.next_reset_at[source] = now + datetime.timedelta(seconds=self.window.get(source))
            self.num_requests[source] = 0

    def request(self, source, method, do_cache=False):
        now = datetime.datetime.now()

        # if cache exists, no need to make api call
        key = source + method.func_name
        if do_cache and key in self.cache:
            timestamp, data = self.cache.get(key)
            logging.info('{} exists in cached @ {}'.format(key, timestamp))

            if (now - timestamp).seconds < self.cache_age:
                logging.info('retrieved cache for {}'.format(key))
                return data

        # <--- MAKE API CALLS ---> #

        # reset the count if the period passed
        if now > self.next_reset_at.get(source):
            self.num_requests[source] = 0
            self.next_reset_at[source] = now + datetime.timedelta(seconds=self.window.get(source))

        # throttle request
        def halt(wait_time):
            if self.throttle_stop:
                raise TooManyRequestsError()
            else:
                # Wait the required time, plus a bit of extra padding time.
                time.sleep(wait_time + 0.1)

        # if exceed max rate, need to wait
        if self.num_requests.get(source) >= self.max_rate.get(source):
            logging.info('back off: {} until {}'.format(source, self.next_reset_at.get(source)))
            halt((self.next_reset_at.get(source) - now).seconds)

        self.num_requests[source] += 1
        response = method()  # potential exception raise

        # cache the response
        if do_cache:
            self.cache[key] = (now, response)
            logging.info('cached instance for {}, {}'.format(source, method))

        return response
Thistly answered 24/2, 2017 at 5:17 Comment(2)
nice work! Does this work for individual IPs, or globally?Lindquist
how would you even use this?Wriggler
O
23

I had the same problem, I had a bunch of different functions that calls the same API and I wanted to make rate limiting work globally. What I ended up doing was to create an empty function with rate limiting enabled.

PS: I use a different rate limiting library found here: https://pypi.org/project/ratelimit/

from ratelimit import limits, sleep_and_retry

# 30 calls per minute
CALLS = 30
RATE_LIMIT = 60

@sleep_and_retry
@limits(calls=CALLS, period=RATE_LIMIT)
def check_limit():
''' Empty function just to check for calls to API '''
return

Then I just call that function at the beginning of every function that calls the API:

def get_something_from_api(http_session, url):
    check_limit()
    response = http_session.get(url)
    return response

If the limit is reached, the program will sleep until the (in my case) 60 seconds have passed, and then resume normally.

Orgulous answered 15/11, 2020 at 13:55 Comment(1)
I notice the question asks for a GLOBAL rate limit, but this solution is only for rate limiting in one specific process/threadIncomprehensive
T
10

After all, I implemented my own Throttler class. By proxying every API request to the request method, we can keep track of all API requests. Taking advantage of passing function as the request method parameter, it also caches the result in order to reduce API calls.

class TooManyRequestsError(Exception):
    def __str__(self):
        return "More than 30 requests have been made in the last five seconds."


class Throttler(object):
    cache = {}

    def __init__(self, max_rate, window, throttle_stop=False, cache_age=1800):
        # Dict of max number of requests of the API rate limit for each source
        self.max_rate = max_rate
        # Dict of duration of the API rate limit for each source
        self.window = window
        # Whether to throw an error (when True) if the limit is reached, or wait until another request
        self.throttle_stop = throttle_stop
        # The time, in seconds, for which to cache a response
        self.cache_age = cache_age
        # Initialization
        self.next_reset_at = dict()
        self.num_requests = dict()

        now = datetime.datetime.now()
        for source in self.max_rate:
            self.next_reset_at[source] = now + datetime.timedelta(seconds=self.window.get(source))
            self.num_requests[source] = 0

    def request(self, source, method, do_cache=False):
        now = datetime.datetime.now()

        # if cache exists, no need to make api call
        key = source + method.func_name
        if do_cache and key in self.cache:
            timestamp, data = self.cache.get(key)
            logging.info('{} exists in cached @ {}'.format(key, timestamp))

            if (now - timestamp).seconds < self.cache_age:
                logging.info('retrieved cache for {}'.format(key))
                return data

        # <--- MAKE API CALLS ---> #

        # reset the count if the period passed
        if now > self.next_reset_at.get(source):
            self.num_requests[source] = 0
            self.next_reset_at[source] = now + datetime.timedelta(seconds=self.window.get(source))

        # throttle request
        def halt(wait_time):
            if self.throttle_stop:
                raise TooManyRequestsError()
            else:
                # Wait the required time, plus a bit of extra padding time.
                time.sleep(wait_time + 0.1)

        # if exceed max rate, need to wait
        if self.num_requests.get(source) >= self.max_rate.get(source):
            logging.info('back off: {} until {}'.format(source, self.next_reset_at.get(source)))
            halt((self.next_reset_at.get(source) - now).seconds)

        self.num_requests[source] += 1
        response = method()  # potential exception raise

        # cache the response
        if do_cache:
            self.cache[key] = (now, response)
            logging.info('cached instance for {}, {}'.format(source, method))

        return response
Thistly answered 24/2, 2017 at 5:17 Comment(2)
nice work! Does this work for individual IPs, or globally?Lindquist
how would you even use this?Wriggler
S
2

Many API providers constrain developers from making too many API calls.

Python ratelimit packages introduces a function decorator preventing a function from being called more often than that allowed by the API provider.

from ratelimit import limits

import requests
TIME_PERIOD = 900   # time period in seconds
    
@limits(calls=15, period=TIME_PERIOD)
def call_api(url):
    response = requests.get(url)
    
    if response.status_code != 200:
        raise Exception('API response: {}'.format(response.status_code))
    return response

Note: This function will not be able to make more then 15 API call within a 15 minute time period.

Seminary answered 7/10, 2020 at 15:45 Comment(1)
This answer misses the point of the question, doesn't it? The question was about how to prevent exceeding a rate limit globally in a script, not with one function.Tenet
N
1

Adding to Sunil answer, you need to add @sleep_and_retry decorator, otherwise your code will break when reach the rate limit:

@sleep_and_retry
@limits(calls=0.05, period=1)
def api_call(url, api_key):
    r = requests.get(
        url,
        headers={'X-Riot-Token': api_key}
        )
    if r.status_code != 200:
        raise Exception('API Response: {}'.format(r.status_code))
    return r
Nobelium answered 2/11, 2020 at 4:58 Comment(0)
R
0

There are lots of fancy libraries that will provide nice decorators, and special safety features, but the below should work with django.core.cache or any other cache with a get and set method:

def hit_rate_limit(key, max_hits, max_hits_interval):
    '''Implement a basic rate throttler. Prevent more than max_hits occurring
    within max_hits_interval time period (seconds).'''
    # Use the django cache, but can be any object with get/set
    from django.core.cache import cache
    hit_count = cache.get(key) or 0
    logging.info("Rate Limit: %s --> %s", key, hit_count)
    if hit_count > max_hits:
        return True
    cache.set(key, hit_count + 1, max_hits_interval)
    return False
Richardo answered 10/12, 2020 at 22:12 Comment(1)
Hi. This can not work, see my answer here: https://mcmap.net/q/575773/-python-api-rate-limiting-how-to-limit-api-calls-globallySiva
J
0

Using the Python standard library:

import threading
from time import time, sleep

b = threading.Barrier(2)

def belay(s=1):
    """Block the main thread for `s` seconds."""
    while True:
        b.wait()
        sleep(s)

def request_something():
    b.wait()
    print(f'something at {time()}')

def request_other():
    b.wait()
    print(f'or other at {time()}')
    

if __name__ == '__main__':

    thread = threading.Thread(target=belay)
    thread.daemon = True
    thread.start()

    # request a lot of things
    i = 0
    while (i := i+1) < 5:
        request_something()
        request_other()

There's about s seconds between each timestamp printed. Because the main thread waits rather than sleeps, time it spends responding to requests is unrelated to the (minimum) time between requests.

Justifiable answered 15/12, 2021 at 17:23 Comment(0)
S
0

@speedlane answer can not work as expected. For eg. with max 4 hits / 1 sec:

  • call at time 1 (t1) 0: cache value set to 1 until t1 + 1
  • call at t2 0.9 : cached value set to 2 until t2 + 1 (1.9)
  • call at t3 1.8 : cached value set to 3 until t3 + 1 (2.8)
  • call at t4 2.7 : cached value set to 4 until t4 + 1 (3.7)
  • call at t5 3.6 : cached value set to 5 until t4 + 1 (4.6)
  • call at t6 4.5 : it fails because cached value is 5 and is greater than max hits (4). But it should not: it's just the second hit until t6 - 1 sec = 3.5.

The problem is that you re-add a max_hit_interval at you expiration cache date.

Here is a working exemple strongly inspired by proposed answer if you don't want an extra package:


from datetime import timedelta
import hashlib
import random
import time
from django.core.cache import cache
from django.utils.timezone import now


class SomeClass:    
    config = {
        "api_url": "https://some-app.fr/v2/",  # base url of the API
        "rate_limit_hits": 4,  # number of requests allowed per rate_limit_seconds
        "rate_limit_seconds": 1,  # number of seconds to raise the rate_limit_hits
        "timeout": 15,  # number of seconds before timeout
    }

    def __init__(self, api_key):
        self.config["api_key"] = api_key

    def pass_rate_limit(self):
        """Ensure that the rate limit won't be raised"""
        rate_limit_hits = self.config.get("rate_limit_hits", 0)
        rate_limit_seconds = self.config.get("rate_limit_seconds", 0)
        if not rate_limit_seconds or not rate_limit_hits:
            return True
        key = hashlib.md5(
            (self.config["api_url"] + self.config["api_key"][0:8]).encode("utf8")
        ).hexdigest()
        key = f"SomeClass-{key}-hits"
        hits = cache.get(key) or []
        first_relevant_hit_dt = now() - timedelta(seconds=rate_limit_seconds)
        relevant_hits = [dt for dt in hits if dt >= first_relevant_hit_dt]
        if len(relevant_hits) >= rate_limit_hits:
            pause_for = 0
            oldest_relevant_hit = relevant_hits[0]
            free_place_available_at = oldest_relevant_hit + timedelta(seconds=rate_limit_seconds)
            pause_for = free_place_available_at - now()
            if pause_for.seconds <= 0:
                pause_for = pause_for.microseconds / 1000000
            print(f"I am paused for {pause_for:0.4f} at ", now())
            time.sleep(pause_for)
            return self.pass_rate_limit()
        relevant_hits.append(now())
        # because depending the cache used, you can't use 1 seconds or less.
        cache.set(key, relevant_hits, 2 if rate_limit_seconds < 2 else rate_limit_seconds)
        return True

test = SomeClass("osef")

for i in range(0,12):
    print("called at: ", now())
    test.pass_rate_limit()
    print("executed at: ", now())
    time.sleep(random.random() / 4)  # fake API duration process for eg

Output:

called at: 2024-01-22 13:46:58.088826+00:00
executed at: 2024-01-22 13:46:58.089861+00:00
called at: 2024-01-22 13:46:58.206402+00:00
executed at: 2024-01-22 13:46:58.207634+00:00
called at: 2024-01-22 13:46:58.254885+00:00
executed at: 2024-01-22 13:46:58.255998+00:00
called at: 2024-01-22 13:46:58.463253+00:00
executed at: 2024-01-22 13:46:58.464364+00:00
called at: 2024-01-22 13:46:58.470950+00:00
I am paused for 0.6179 at 2024-01-22 13:46:58.471554+00:00
executed at: 2024-01-22 13:46:59.091214+00:00
called at: 2024-01-22 13:46:59.216833+00:00
executed at: 2024-01-22 13:46:59.218037+00:00
called at: 2024-01-22 13:46:59.311481+00:00
executed at: 2024-01-22 13:46:59.312633+00:00
called at: 2024-01-22 13:46:59.315305+00:00
I am paused for 0.1479 at 2024-01-22 13:46:59.316001+00:00
executed at: 2024-01-22 13:46:59.465314+00:00
called at: 2024-01-22 13:46:59.693648+00:00
I am paused for 0.3965 at 2024-01-22 13:46:59.694304+00:00
executed at: 2024-01-22 13:47:00.092423+00:00
called at: 2024-01-22 13:47:00.242771+00:00
executed at: 2024-01-22 13:47:00.243341+00:00
called at: 2024-01-22 13:47:00.331994+00:00
executed at: 2024-01-22 13:47:00.333285+00:00
called at: 2024-01-22 13:47:00.433985+00:00
I am paused for 0.0301 at 2024-01-22 13:47:00.434795+00:00
executed at: 2024-01-22 13:47:00.466349+00:00

Siva answered 22/1 at 13:50 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.