How to wait for Celery task result in asynchronous RestAPI?
Asked Answered
I

1

8

I have an API build with FastAPI which endpoint submits a task to a celery worker, waits for worker to finish its job and return a result to the user.

Question is what is the correct way to wait the result?

Endpoint code

from tasks import celery_application, some_task
from celery.result import AsyncResult

@api.post('/submit')
async def submit(data: str):
    task = some_task.apply_async(kwargs={'data': data}, queue='some_queue')
    result = AsyncResult(id=task.task_id, app=celery_application).get()
    return {'task_result': result}

The problem with AsyncResult that it is that get method blocks the application, it waits for the result synchronously and the api freezes in the meantime.

One of the solutions I came up with is checking for result in a loop for n seconds

from tasks import celery_application, some_task
import asyncio
import redis


r = redis.Redis.from_url(REDIS_CONN_URI)


@api.post('/submit')
async def submit(data: str):
    task = some_task.apply_async(kwargs={'data': data}, queue='some_queue')
    result = None
    for _ in range(100):
        if r.exists(task.task_id):
            result = r.get(task.task_id)
            break
        await asyncio.sleep(0.3)

    return {'task_result': result}

But it only works partially. Although endpoint is not blocked and can be accessed. Endpoint gets blocked when it tries to reach send task again.

Inscription answered 14/2, 2021 at 13:56 Comment(5)
does the second approach unblock the API and the more requests can be made without piling up?Sorn
yes, it unblocks every 0.3 sec, so api is available and it is possible to submit additional requests. The solution to piling up would be to celery worker scaling.Inscription
The solution to not blocking your API is actually run the tasks in the background and not wait on their result. Return the task ID from Celery and create a new endpoint which allows you to poll the task status. Once the task is complete you can get the result in another endpoint.Cleanlimbed
@Cleanlimbed So basically the user that is requesting the api needs to spam that other api endpoint for the result?Inscription
Yes that is a solid way for longer background tasks. From the first endpoint return the link to check the status and/or result. From the status endpoint you can poll the task status then return the task result, or a final link to the results.Cleanlimbed
S
0

Yes, that redis interface is blocking. You should either switch to use aioredis for this pooling, so that it can work along with asynchronous code, or, stuck the redis tasks in a ThreadPoolExecutor - so that the main thread won't block waiting for .get's return.

The second way doesn't require you to change nothing in code or requisites - just create a suitable worker pool that you can use process-wise. This should be proportional to the number of requests per second you get there, and is not bound by the number of CPU-cores you have: most threads in the worker will just do nothing waiting for the results. (Unffortunatelly Python executor model does not offer a dynamic pool, otherwise, adjusting the pool size according to the load could be a nice option there). Anyway, if the celery workers are local, you are bound on the number of resuts you can get concurrently - so the standard 2X CPU core count might be enough. Otherwise, a thread-pool executor of up to 100 or 200 threads can work for a medium sized VM and offer you a nice throughtput at about 1000s requests per second, I believe.


from tasks import celery_application, some_task
import asyncio
import redis

MAXWORKERS = 24 # check text. 
executor = concurrent.futures.ThreadPoolExecutor(MAXWORKERS)

r = redis.Redis.from_url(REDIS_CONN_URI)


@api.post('/submit')
async def submit(data: str):
    task = some_task.apply_async(kwargs={'data': data}, queue='some_queue')
    result_call = AsyncResult(id=task.task_id, app=celery_application).get
    result = await asyncio.run_in_executor(executor, result_call)
    return {'task_result': result}
    return {'task_result': result}

Of course, this assumes your front-end can wait comfortably for .get() to return -I am assuming it takes at most a few seconds. This won't block your HTTP API.
If the result is longer, then you have indeed to re-structure things in order to return an immediate response to your caller, and later on provide some feedback. Asyncio can be used for that as well, depending on the feedback you intend to give, by registering a callback function to run whenever .get returns - in this case you'd call asyncio.create_task passing it the .run_in_executor call and then calling .add_done_callback on the returned task. (Also, you will need a data structure (a set) to keep track of ongoing tasks otherwise they might be de-referenced by the event loop)

Shelburne answered 30/4, 2024 at 17:26 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.