I have an API build with FastAPI which endpoint submits a task to a celery worker, waits for worker to finish its job and return a result to the user.
Question is what is the correct way to wait the result?
Endpoint code
from tasks import celery_application, some_task
from celery.result import AsyncResult
@api.post('/submit')
async def submit(data: str):
task = some_task.apply_async(kwargs={'data': data}, queue='some_queue')
result = AsyncResult(id=task.task_id, app=celery_application).get()
return {'task_result': result}
The problem with AsyncResult
that it is that get
method blocks the application, it waits for the result synchronously and the api freezes in the meantime.
One of the solutions I came up with is checking for result in a loop for n seconds
from tasks import celery_application, some_task
import asyncio
import redis
r = redis.Redis.from_url(REDIS_CONN_URI)
@api.post('/submit')
async def submit(data: str):
task = some_task.apply_async(kwargs={'data': data}, queue='some_queue')
result = None
for _ in range(100):
if r.exists(task.task_id):
result = r.get(task.task_id)
break
await asyncio.sleep(0.3)
return {'task_result': result}
But it only works partially. Although endpoint is not blocked and can be accessed. Endpoint gets blocked when it tries to reach send task again.