How to combine Celery with asyncio?
Asked Answered
B

12

78

How can I create a wrapper that makes celery tasks look like asyncio.Task? Or is there a better way to integrate Celery with asyncio?

@asksol, the creator of Celery, said this::

It's quite common to use Celery as a distributed layer on top of async I/O frameworks (top tip: routing CPU-bound tasks to a prefork worker means they will not block your event loop).

But I could not find any code examples specifically for asyncio framework.

Bowerman answered 2/10, 2016 at 9:51 Comment(1)
Can you clarify what you mean by 'look like'. I think maybe you misinterpret Asksol's comment - you put celery infront of frameworks such as Rabbit or SQS that is the async manager. As such you could possibly make a broker / plugin for celery which uses asyncio, but the tasks wouldn't "look like" (i.e. have the interface of) asyncio? The point of celery is to abstract the asynchronus methodology used?Miler
B
71

EDIT: 01/12/2021 previous answer (find it at the bottom) didn't age well therefore I added a combination of possible solutions that may satisfy those who still look on how to co-use asyncio and Celery

Lets quickly break up the use cases first (more in-depth analysis here: asyncio and coroutines vs task queues):

  • If the task is I/O bound then it tends to be better to use coroutines and asyncio.
  • If the task is CPU bound then it tends to be better to use Celery or other similar task management systems.

So it makes sense in the context of Python's "Do one thing and do it well" to not try and mix asyncio and celery together.

BUT what happens in cases where we want to be able to run a method both asynchronously and as an async task? then we have some options to consider:

  • The best example that I was able to find is the following: https://johnfraney.ca/posts/2018/12/20/writing-unit-tests-celery-tasks-async-functions/ (and I just found out that it is @Franey's response):

    1. Define your async method.

    2. Use asgiref's sync.async_to_sync module to wrap the async method and run it synchronously inside a celery task:

      # tasks.py
      import asyncio
      from asgiref.sync import async_to_sync
      from celery import Celery
      
      app = Celery('async_test', broker='a_broker_url_goes_here')
      
      async def return_hello():
          await asyncio.sleep(1)
          return 'hello'
      
      
      @app.task(name="sync_task")
      def sync_task():
          async_to_sync(return_hello)()
      
  • A use case that I came upon in a FastAPI application was the reverse of the previous example:

    1. An intense CPU bound process is hogging up the async endpoints.

    2. The solution is to refactor the async CPU bound process into a celery task and pass a task instance for execution from the Celery queue.

    3. A minimal example for visualization of that case:

      import asyncio
      import uvicorn
      
      from celery import Celery
      from fastapi import FastAPI
      
      app = FastAPI(title='Example')
      worker = Celery('worker', broker='a_broker_url_goes_here')
      
      @worker.task(name='cpu_boun')
      def cpu_bound_task():
          # Does stuff but let's simplify it
          print([n for n in range(1000)])
      
      @app.get('/calculate')
      async def calculate():
          cpu_bound_task.delay()
      
      if __name__ == "__main__":
          uvicorn.run('main:app', host='0.0.0.0', port=8000)
      
  • Another solution seems to be what @juanra and @danius are proposing in their answers, but we have to keep in mind that performance tends to take a hit when we intermix sync and async executions, thus those answers need monitoring before we can decide to use them in a prod environment.

Finally, there are some ready-made solutions, that I cannot recommend (because I have not used them myself) but I will list them here:

  • Celery Pool AsyncIO which seems to solve exactly what Celery 5.0 didn't, but keep in mind that it seems a bit experimental (version 0.2.0 today 01/12/2021)
  • aiotasks claims to be "a Celery like task manager that distributes Asyncio coroutines" but seems a bit stale (latest commit around 2 years ago)

Well that didn't age so well did it? Version 5.0 of Celery didn't implement asyncio compatibility thus we cannot know when and if this will ever be implemented... Leaving this here for response legacy reasons (as it was the answer at the time) and for comment continuation.

That will be possible from Celery version 5.0 as stated on the official site:

http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface

  1. The next major version of Celery will support Python 3.5 only, where we are planning to take advantage of the new asyncio library.
  2. Dropping support for Python 2 will enable us to remove massive amounts of compatibility code, and going with Python 3.5 allows us to take advantage of typing, async/await, asyncio, and similar concepts there’s no alternative for in older versions.

The above was quoted from the previous link.

So the best thing to do is wait for version 5.0 to be distributed!

In the meantime, happy coding :)

Budwig answered 10/4, 2017 at 13:58 Comment(7)
This didn't happen, and celery 5 is not compatible with asyncio.Tableland
@Tableland I haven't used celery 5 as of yet, I will investigate it further! Thanks for the updateBudwig
@Tableland Well, I did my research and refactored this answer, hope you can find something useful in there!Budwig
I opened a feature request and they answered "it is a part of a bigger design decision which we are planning for celery 6.0".Ivo
@BenoitBlanchon very interesting! We will wait and see I suppose :)Budwig
Until we get official support in Celery, I found that polling the status of the AyncResult provides an excellent workaround.Ivo
@JohnMoutafis I tried your solution with async_to_sync and it works on the first beat. But then I get this error: got Future <Future pending cb=[Protocol._on_waiter_completed()]> attached to a different loop. Do you have any suggestions, please? ThanksKreg
L
36

This simple way worked fine for me:

import asyncio
from celery import Celery

app = Celery('tasks')

async def async_function(param1, param2):
    # more async stuff...
    pass

@app.task(name='tasks.task_name', queue='queue_name')
def task_name(param1, param2):
    asyncio.run(async_function(param1, param2))
Lawrence answered 31/7, 2019 at 8:58 Comment(2)
It will work in tests, but what's the price of running the reactor in runtime on request?Imago
This works, but all tasks run similarly will be run synchronously: asyncio.run(async1()) and on the next line asyncio.run(async2()). async1 and async2 will be run synchronously.Celka
I
10

Here is a simple helper that you can use to make a Celery task awaitable:

import asyncio
from asgiref.sync import sync_to_async

# Converts a Celery tasks to an async function
def task_to_async(task):
    async def wrapper(*args, **kwargs):
        delay = 0.1
        async_result = await sync_to_async(task.delay)(*args, **kwargs)
        while not async_result.ready():
            await asyncio.sleep(delay)
            delay = min(delay * 1.5, 2)  # exponential backoff, max 2 seconds
        return async_result.get()
    return wrapper

Like sync_to_async, it can be used as a direct wrapper:

@shared_task
def get_answer():
    sleep(10) # simulate long computation
    return 42    

result = await task_to_async(get_answer)()

...and as a decorator:

@task_to_async
@shared_task
def get_answer():
    sleep(10) # simulate long computation
    return 42    

result = await get_answer()

Of course, this is not a perfect solution since it relies on polling. However, it should be a good workaround to call Celery tasks from Django async views until Celery officially provides a better solution.

EDIT 2021/03/02: added the call to sync_to_async to support eager mode.

Ivo answered 22/2, 2021 at 15:16 Comment(4)
That's a solid workaround and we already use this (not as a decorator though) in the FastAPI application mentioned in my answer :) Keep in mind that you need to pay attention to the error handling and have a plan on how you want any potential exceptions to be handled!Budwig
task_to_async calls AsyncResult.get(), which re-raise any exception raised by the tasks. Of course, if you want to customize this behavior, you can add parameters to task_to_async and forward them to async_result.get().Ivo
whats the point of wrapping the task in the async helper? couldnt you just implement the loop with sleep, without it? Afaik task.delay is non blocking. Only something like task.get would block.Shelf
@BenoitBlanchon Is it necessary to poll? Why not simply do async_result = await sync_to_async(task.delay)(*args, **kwargs) followed by res = await sync_to_async(async_result.get, thread_sensitive=False)()? Passing thread_sensitive=False would ensure it does not block the main thread.Cainozoic
O
7

You can wrap any blocking call into a Task using run_in_executor as described in documentation, I also added in the example a custom timeout:

def run_async_task(
    target,
    *args,
    timeout = 60,
    **keywords
) -> Future:
    loop = asyncio.get_event_loop()
    return asyncio.wait_for(
        loop.run_in_executor(
            executor,
            functools.partial(target, *args, **keywords)
        ),
        timeout=timeout,
        loop=loop
    )
loop = asyncio.get_event_loop()
async_result = loop.run_until_complete(
    run_async_task, your_task.delay, some_arg, some_karg="" 
)
result = loop.run_until_complete(
    run_async_task, async_result.result 
)
Obverse answered 26/2, 2018 at 22:39 Comment(0)
P
5

The cleanest way I've found to do this is to wrap the async function in asgiref.sync.async_to_sync (from asgiref):

from asgiref.sync import async_to_sync
from celery.task import periodic_task


async def return_hello():
    await sleep(1)
    return 'hello'


@periodic_task(
    run_every=2,
    name='return_hello',
)
def task_return_hello():
    async_to_sync(return_hello)()

I pulled this example from a blog post I wrote.

Piperidine answered 25/1, 2019 at 21:44 Comment(2)
Very nice, I found your article during my research on the issue and I included it in the edit of my answer (I am mentioning you of course now that I found it out)! Thank you for the knowledge boost :)Budwig
Thanks! It's always cool to see references to my articles pop up, even if it's within the same thread.Piperidine
P
5

For anyone who stumbles on this looking for help specifically with async sqlalchemy (ie, using the asyncio extension) and Celery tasks, explicitly disposing of the engine will fix the issue. This particular example worked with asyncpg.

Example:

from sqlalchemy.ext.asyncio import (
    AsyncSession,
    create_async_engine,
)
from sqlalchemy.orm import sessionmaker
from asgiref.sync import async_to_sync


engine = create_async_engine("some_uri", future=True)
async_session_factory = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)


@celery_app.task(name="task-name")
def sync_func() -> None:
    async_to_sync(some_func)()


async def some_func() -> None:
    async with get_db_session() as session:
        result = await some_db_query(session)
    # engine.dispose will be called on exit


@contextlib.asynccontextmanager
async def get_db_session() -> AsyncGenerator:
    try:
        db = async_session_factory()
        yield db
    finally:
        await db.close()
        await engine.dispose()
Pires answered 5/8, 2022 at 18:9 Comment(1)
then, how would you test this sync_func, , you will get ; E RuntimeError: asyncio.run() cannot be called from a running event loopNodule
I
4

A nice way to implement Celery with asyncio:

import asyncio
from celery import Celery

app = Celery()

async def async_function(param):
    print('do something')

@app.task()
def celery_task(param):
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(async_function(param))
Indenture answered 4/7, 2022 at 15:33 Comment(0)
I
3

Here's my implementation of Celery handling async coroutines when necessary:

Wrap the Celery class to extend its functionnality:

from celery import Celery
from inspect import isawaitable
import asyncio


class AsyncCelery(Celery):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.patch_task()

        if 'app' in kwargs:
            self.init_app(kwargs['app'])

    def patch_task(self):
        TaskBase = self.Task

        class ContextTask(TaskBase):
            abstract = True

            async def _run(self, *args, **kwargs):
                result = TaskBase.__call__(self, *args, **kwargs)
                if isawaitable(result):
                    await result

            def __call__(self, *args, **kwargs):
                asyncio.run(self._run(*args, **kwargs))

        self.Task = ContextTask

    def init_app(self, app):
        self.app = app

        conf = {}
        for key in app.config.keys():
            if key[0:7] == 'CELERY_':
                conf[key[7:].lower()] = app.config[key]

        if 'broker_transport_options' not in conf and conf.get('broker_url', '')[0:4] == 'sqs:':
            conf['broker_transport_options'] = {'region': 'eu-west-1'}

        self.config_from_object(conf)


celery = AsyncCelery()
Insignificancy answered 15/10, 2021 at 12:51 Comment(1)
by far the cleanest solution for me! ThanksUnrefined
J
2

I solved problem by combining Celery and asyncio in the celery-pool-asyncio library.

Jamshedpur answered 20/5, 2020 at 13:17 Comment(2)
This actually seems a great solution, only issue that it does not support celery 5. Any timeline for this ?Logrolling
Sadly, the project is no longer maintained.Kreg
V
0

I did this implementation that wraps @app.task decorator to implement async delay() and apply_async() functions. It depends on Redis and extends the redis backend to use a pubsub to wait for results. It's used with Django.

import json
import asyncio
import redis.asyncio
from celery.backends.redis import RedisBackend
from django.conf import settings
from MYAPP.celery import app


APOOL = redis.asyncio.ConnectionPool(host=settings.REDIS_HOST,
                                     db=settings.REDIS_DB,
                                     port=settings.REDIS_PORT)

CELERY_TASK_PREFIX = b'celery-task-meta-'

class PubSubBackend(RedisBackend):
    def __init__(self, *argz, **kwargs):
        kwargs['host'] = settings.REDIS_HOST
        kwargs['port'] = settings.REDIS_PORT
        kwargs['db'] = settings.REDIS_DB
        super().__init__(*argz, **kwargs)

    # This backend client will publish to subscribers when a task is finished
    def set(self, key, value, **retry_policy):
        return self.client.publish(key, value)

class RemoteTaskException(Exception):
    pass


class RemoteTaskTimeout(Exception):
    pass


TIMEOUT = 10
async def _read_task_result(channel, future):
    import time
    limit = time.time() + TIMEOUT
    while time.time() < limit:
        message = await channel.get_message(ignore_subscribe_messages=True)
        if message is not None:
            future.set_result(message)
            return
    future.set_result({'data': json.dumps(
        { 'status': 'TIMEOUT' },
    ).encode()})

async def get_task_result(task_id):
    future = asyncio.Future()
    conn = redis.asyncio.Redis(connection_pool=APOOL)
    async with conn.pubsub() as pubsub:
        queue = f'{CELERY_TASK_PREFIX.decode()}{task_id}'.encode()
        await pubsub.subscribe(queue)
        return_task = _read_task_result(pubsub, future)
        await asyncio.create_task(return_task)
        result = future.result()
        result_data = json.loads(result['data'].decode())
        status = result_data['status']
        if status == 'SUCCESS':
            return result_data['result']
        elif status == 'FAILURE':
            raise RemoteTaskException(result_data['result']['exc_message'][0])
        elif status == 'TIMEOUT':
            raise RemoteTaskTimeout()
        else:
            raise Exception(f'Uknown task status {status}')


class AsyncTask:

    def __init__(self, sub):
        self.task = app.task(sub)

    async def delay(self, *args, **kwargs):
        task = self.task.delay(*args, **kwargs)
        return await get_task_result(task.id)

    def s(self, *args, **kwargs):
        return self.task.s(*args, **kwargs)

    async def apply_async(self, *args, **kwargs):
        task = self.task.apply_async(*args, **kwargs)
        return await get_task_result(task.id)

    def __call__(self, *args, **kwargs):
        return self.task(*args, **kwargs)


# Task decorator
def async_task(sub):
    return AsyncTask(sub)

Then, in settings.py:

CELERY_RESULT_BACKEND = 'myapp.somewhere.PubSubBackend'
Veradis answered 16/6, 2023 at 20:52 Comment(0)
S
0
  1. Celery has clear producer-consumer separation. @john-moutafis has covered the consumer side very well.
  2. I reckon that producer side also deserve some attention, where asyncio and celery paradigms can be naturally mixed.
  3. Since tornado is a well known asyncio server framework, I searched tornado and celery, and found a nice example repo tornado-celery by mher (creator of celery-flower) from 2018
  4. This code example uses asyncio-tornado to handle io-bound http request, and celery to handle the cpu-bound tasks, hopefully echoing the quote:

It's quite common to use Celery as a distributed layer on top of async I/O frameworks (top tip: routing CPU-bound tasks to a prefork worker means they will not block your event loop).

from tornado import gen
from tornado import ioloop
from tornado.web import asynchronous, RequestHandler, Application

import tasks

import tcelery
tcelery.setup_nonblocking_producer()


class AsyncHandler(RequestHandler):
   @asynchronous
   def get(self):
       tasks.sleep.apply_async(args=[3], callback=self.on_result)

   def on_result(self, response):
       self.write(str(response.result))
       self.finish()


class GenAsyncHandler(RequestHandler):
   @asynchronous
   @gen.coroutine
   def get(self):
       response = yield gen.Task(tasks.sleep.apply_async, args=[3])
       self.write(str(response.result))
       self.finish()
Stickle answered 22/10, 2023 at 6:36 Comment(0)
H
0

Consider taskiq as an asyncio celery implementation. It uses almost the same patterns, but it's more modern and flexible.

It's not a drop-in replacement for any other task manager. It has a different ecosystem of libraries and a different set of features. Also, it doesn't work for synchronous projects. You won't be able to send tasks synchronously.

Handling answered 29/5, 2024 at 14:6 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.