how to cache asyncio coroutines
Asked Answered
S

11

32

I am using aiohttp to make a simple HTTP request in python 3.4 like this:

response = yield from aiohttp.get(url)

The application requests the same URL over and over again so naturally I wanted to cache it. My first attempt was something like this:

@functools.lru_cache(maxsize=128)
def cached_request(url):
    return aiohttp.get(url)

The first call to cached_request works fine, but in later calls I end up with None instead of the response object.

I am rather new to asyncio so I tried a lot of combinations of the asyncio.coroutine decorator, yield from and some other things, but none seemed to work.

So how does caching coroutines work?

Shama answered 6/12, 2015 at 11:32 Comment(3)
Not sure what you mean by caching a coroutine? e.g. Save it as a variable so that you can call it repeatedly? Save the result, till the result is replaced on a later execution? Or have the same coroutine repeat at a later time?Martymartyn
@shongololo I want to cache the result of the coroutine.Shama
I am not familiar with functools.lru_cache() but if you simply want to return updated results, then is there any reason you don't just save the updated results to a variable? Nevertheless, when using an asynchronous method (such as aiohttp.get()) you have to drive it with something. So cached_request has to be enclosed with @asyncio.coroutine; it has to be called usingyield from; and the return statement should be framed along the lines of return (yield from aiohttp.get(url))Martymartyn
V
15

Maybe a bit late, but I've started a new package that may help: https://github.com/argaen/aiocache. Contributions/comments are always welcome.

An example:

import asyncio

from collections import namedtuple

from aiocache import cached
from aiocache.serializers import PickleSerializer

Result = namedtuple('Result', "content, status")


@cached(ttl=10, serializer=PickleSerializer())
async def async_main():
    print("First ASYNC non cached call...")
    await asyncio.sleep(1)
    return Result("content", 200)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    print(loop.run_until_complete(async_main()))
    print(loop.run_until_complete(async_main()))
    print(loop.run_until_complete(async_main()))
    print(loop.run_until_complete(async_main()))

Note that as an extra, it can cache any python object into redis using Pickle serialization. In case you just want to work with memory, you can use the SimpleMemoryCache backend :).

Vince answered 3/10, 2016 at 8:20 Comment(0)
C
11

A popular async version of lru_cache exist here: async_lru

Counterman answered 4/3, 2019 at 16:6 Comment(0)
C
10

To use functools.lru_cache with coroutines, the following code works.

class Cacheable:
    def __init__(self, co):
        self.co = co
        self.done = False
        self.result = None
        self.lock = asyncio.Lock()

    def __await__(self):
        with (yield from self.lock):
            if self.done:
                return self.result
            self.result = yield from self.co.__await__()
            self.done = True
            return self.result

def cacheable(f):
    def wrapped(*args, **kwargs):
        r = f(*args, **kwargs)
        return Cacheable(r)
    return wrapped


@functools.lru_cache()
@cacheable
async def foo():
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()

The following is thread safe

class ThreadSafeCacheable:
    def __init__(self, co):
        self.co = co
        self.done = False
        self.result = None
        self.lock = threading.Lock()

    def __await__(self):
        while True:
            if self.done:
                return self.result
            if self.lock.acquire(blocking=False):
                self.result = yield from self.co.__await__()
                self.done = True
                return self.result
            else:
                yield from asyncio.sleep(0.005)
Crespo answered 13/10, 2017 at 5:35 Comment(2)
Thanks! One case where it does not work though: when the decorated coroutine raises an exception. In that case self.done is not set to True and the next call raises RuntimeError: cannot reuse already awaited coroutine. In my case I could record the exception and re-raise it, but others might want to re-run the coro (and I don't know if it's possible).Ammons
Note that starting in Python 3.7, yield from syntax is no longer available with Asyncio.lock. #77978697 has a version of this in the question body that gets around that change (though it may be more useful to use its second option with create_task)Suspicion
S
6

I wrote a simple cache decorator myself:

def async_cache(maxsize=128):
    cache = {}

    def decorator(fn):
        def wrapper(*args):                                                         
            key = ':'.join(args)

            if key not in cache:
                if len(cache) >= maxsize:
                    del cache[cache.keys().next()]

                cache[key] = yield from fn(*args)

            return cache[key]

        return wrapper

    return decorator


@async_cache()
@asyncio.coroutine
def expensive_io():
    ....

This kind-of-works. But many aspects can probably be improved. For example: If the cached function is called a second time before the first call returns, it will execute a second time.

Shama answered 6/12, 2015 at 22:4 Comment(3)
Suggestion: use an OrderedDict to implement lru behaviour, i.e. use OrderedDict.move_to_end on every key called, and then OrderedDict.popitem when the cache is full.Hog
You can use a Semaphore to limit it to one execution.Tijuana
This is a bit dangerous because it relies on the string representation of the arguments. It would be best to stick to hashable objects like the original lru_cache.Bloomer
B
5

This is how I think it's most easily done, using the built-in lru_cache and futures:

import asyncio
import functools

# parameterless decorator
def async_lru_cache_decorator(async_function):
    @functools.lru_cache
    def cached_async_function(*args, **kwargs):
        coroutine = async_function(*args, **kwargs)
        return asyncio.ensure_future(coroutine)
    return cached_async_function

# decorator with options
def async_lru_cache(*lru_cache_args, **lru_cache_kwargs):
    def async_lru_cache_decorator(async_function):
        @functools.lru_cache(*lru_cache_args, **lru_cache_kwargs)
        def cached_async_function(*args, **kwargs):
            coroutine = async_function(*args, **kwargs)
            return asyncio.ensure_future(coroutine)
        return cached_async_function
    return async_lru_cache_decorator

@async_lru_cache(maxsize=128)
async def your_async_function(...): ...

This is basically taking your original function and wrapping it so I can store the Coroutine it returns and convert it into a Future. This way, this can be treated as a regular function and you can lru_cache-it as you would usually do it.

Why is wrapping it in a Future necessary? Python coroutines are low level constructs and you can't await one more than once (You would get RuntimeError: cannot reuse already awaited coroutine). Futures, on the other hand, are handy and can be awaited consecutively and will return the same result.

One caveat is that caching a Future will also cache when the original functions raised an Error. The original lru_cache does not cache interrupted executions, so watch out for this edge case using the solution above.

Further tweaking can be done to merge both the parameter-less and the parameterized decorators, like the original lru_cache which supports both usages.

Bloomer answered 1/6, 2021 at 20:55 Comment(0)
D
3

I'm not that familiar with aiohttp so I'm not sure of exactly what is happening that would cause Nones to be returned, but the lru_cache decorator will not work with async functions.

I use a decorator which does essentially the same thing; note that it is different to tobib's decorator above in that it will always return a future or a task, rather than the value:

from collections import OrderedDict
from functools import _make_key, wraps

def future_lru_cache(maxsize=128):
    # support use as decorator without calling, for this case maxsize will
    # not be an int
    try:
        real_max_size = int(maxsize)
    except ValueError:
        real_max_size = 128

    cache = OrderedDict()

    async def run_and_cache(func, args, kwargs):
        """Run func with the specified arguments and store the result
        in cache."""
        result = await func(*args, **kwargs)
        cache[_make_key(args, kwargs, False)] = result
        if len(cache) > real_max_size:
            cache.popitem(False)
        return result

    def wrapper(func):
        @wraps(func)
        def decorator(*args, **kwargs):
            key = _make_key(args, kwargs, False)
            if key in cache:
                # Some protection against duplicating calls already in
                # progress: when starting the call cache the future, and if
                # the same thing is requested again return that future.
                if isinstance(cache[key], asyncio.Future):
                    return cache[key]
                else:
                    f = asyncio.Future()
                    f.set_result(cache[key])
                    return f
            else:
                task = asyncio.Task(run_and_cache(func, args, kwargs))
                cache[key] = task
                return task
        return decorator

    if callable(maxsize):
        return wrapper(maxsize)
    else:
        return wrapper

I used _make_key from functools as lru_cache does, I guess it's supposed to be private so probably better to copy it over.

Desexualize answered 4/6, 2016 at 6:21 Comment(0)
C
2

Try async-cache :pypi async-cache :github for caching async functions in python.

It also supports function which have parameters of user defined or object type or unhashable type which is not supported in either functools.lru_cache or async_lru .

Usage:

pip install async-cache
from cache import AsyncLRU

@AsyncLRU(maxsize=128)
async def func(*args, **kwargs):
    pass
Charlottetown answered 27/4, 2020 at 12:19 Comment(1)
Worked for me best in Feb 2022Leonhard
H
1

Another variant of lru decorator, which caches not yet finished coroutines, very useful with parallel requests to the same key:

import asyncio
from collections import OrderedDict
from functools import _make_key, wraps

def async_cache(maxsize=128, event_loop=None):
    cache = OrderedDict()
    if event_loop is None:
        event_loop = asyncio.get_event_loop()
    awaiting = dict()

    async def run_and_cache(func, args, kwargs):
        """await func with the specified arguments and store the result
        in cache."""
        result = await func(*args, **kwargs)
        key = _make_key(args, kwargs, False)
        cache[key] = result
        if len(cache) > maxsize:
            cache.popitem(False)
        cache.move_to_end(key)
        return result

    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            key = _make_key(args, kwargs, False)
            if key in cache:
                return cache[key]
            if key in awaiting:
                task = awaiting[key]
                return await asyncio.wait_for(task, timeout=None, loop=event_loop)
            task = asyncio.ensure_future(run_and_cache(func, args, kwargs), loop=event_loop)
            awaiting[key] = task
            result = await asyncio.wait_for(task, timeout=None, loop=event_loop)
            del awaiting[key]
            return result
        return wrapper

    return decorator


async def test_async_cache(event_loop):
    counter = 0
    n, m = 10, 3

    @async_cache(maxsize=n, event_loop=event_loop)
    async def cached_function(x):
        nonlocal counter
        await asyncio.sleep(0)  # making event loop switch to other coroutine
        counter += 1
        return x

    tasks = [asyncio.ensure_future(cached_function(x), loop=event_loop)
             for x in list(range(n)) * m]
    done, pending = await asyncio.wait(tasks, loop=event_loop, timeout=1)
    assert len(done) == n * m
    assert counter == n

event_loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test_async_cache(event_loop))
event_loop.run_until_complete(task)
Hannie answered 22/9, 2016 at 1:16 Comment(0)
U
1

I think that the simplest way is to use aiohttp_cache (documentation)

pip install aiohttp-cache

And use it in code:

from aiohttp_cache import cache, setup_cache

@cache()  # <-- DECORATED FUNCTION
async def example_1(request):
    return web.Response(text="Example")


app = web.Application()

app.router.add_route('GET', "/", example_1)

setup_cache(app)  # <-- INITIALIZED aiohttp-cache

web.run_app(app, host="127.0.0.1")
Unpaidfor answered 24/7, 2018 at 15:4 Comment(1)
Do you need additional caching on the client too?Unpaidfor
A
1

I was working with FastAPI. The thing is, it executes the normal def dependencies inside another thread. So unless it's not a blocking call, it could be better to use an async def dependency.

With that in mind, I also needed to cache the value of this async def dependency. So I ended up writing my own which is simple and works:

import asyncio
from functools import wraps


def async_cache(async_fn):
    """Async version of functools.lru_cache"""
    lock = asyncio.Lock()
    cache = {}

    @wraps(async_fn)
    async def inner(*args, **kwargs):
        cache_key = (args, tuple(kwargs.items()))
        if cache_key in cache:
            return cache[cache_key]

        async with lock:
            if cache_key not in cache:
                cache[cache_key] = await async_fn(*args, **kwargs)
        return cache[cache_key]
    return inner


@async_cache
async def square(n, **extras):
    print("inside coroutine")
    await asyncio.sleep(n)
    return n * n


async def main():
    print(await asyncio.gather(*[square(3, foo="bar") for _ in range(4)]))
    print("-----------------------------------")
    print(await asyncio.gather(*[square(3, foo="bar") for _ in range(4)]))


asyncio.run(main())

The lock object is optional, In my actual code, I only wanted to have one instance from the same parameters so I first grabbed the lock object.

Addy answered 3/3, 2024 at 11:33 Comment(0)
G
0

I wrote a simple package named asyncio-cache - https://github.com/matan1008/asyncio-cache.

I tried to keep the code as close as possible to the original python implementation and as simple as possible.

For example:

from asyncio_cache import lru_cache
import aiohttp

@lru_cache(maxsize=128)
async def cached_get(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()
Greengrocery answered 7/11, 2021 at 20:54 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.