I did this implementation that wraps @app.task decorator to implement async delay() and apply_async() functions. It depends on Redis and extends the redis backend to use a pubsub to wait for results. It's used with Django.
import json
import asyncio
import redis.asyncio
from celery.backends.redis import RedisBackend
from django.conf import settings
from MYAPP.celery import app
APOOL = redis.asyncio.ConnectionPool(host=settings.REDIS_HOST,
db=settings.REDIS_DB,
port=settings.REDIS_PORT)
CELERY_TASK_PREFIX = b'celery-task-meta-'
class PubSubBackend(RedisBackend):
def __init__(self, *argz, **kwargs):
kwargs['host'] = settings.REDIS_HOST
kwargs['port'] = settings.REDIS_PORT
kwargs['db'] = settings.REDIS_DB
super().__init__(*argz, **kwargs)
# This backend client will publish to subscribers when a task is finished
def set(self, key, value, **retry_policy):
return self.client.publish(key, value)
class RemoteTaskException(Exception):
pass
class RemoteTaskTimeout(Exception):
pass
TIMEOUT = 10
async def _read_task_result(channel, future):
import time
limit = time.time() + TIMEOUT
while time.time() < limit:
message = await channel.get_message(ignore_subscribe_messages=True)
if message is not None:
future.set_result(message)
return
future.set_result({'data': json.dumps(
{ 'status': 'TIMEOUT' },
).encode()})
async def get_task_result(task_id):
future = asyncio.Future()
conn = redis.asyncio.Redis(connection_pool=APOOL)
async with conn.pubsub() as pubsub:
queue = f'{CELERY_TASK_PREFIX.decode()}{task_id}'.encode()
await pubsub.subscribe(queue)
return_task = _read_task_result(pubsub, future)
await asyncio.create_task(return_task)
result = future.result()
result_data = json.loads(result['data'].decode())
status = result_data['status']
if status == 'SUCCESS':
return result_data['result']
elif status == 'FAILURE':
raise RemoteTaskException(result_data['result']['exc_message'][0])
elif status == 'TIMEOUT':
raise RemoteTaskTimeout()
else:
raise Exception(f'Uknown task status {status}')
class AsyncTask:
def __init__(self, sub):
self.task = app.task(sub)
async def delay(self, *args, **kwargs):
task = self.task.delay(*args, **kwargs)
return await get_task_result(task.id)
def s(self, *args, **kwargs):
return self.task.s(*args, **kwargs)
async def apply_async(self, *args, **kwargs):
task = self.task.apply_async(*args, **kwargs)
return await get_task_result(task.id)
def __call__(self, *args, **kwargs):
return self.task(*args, **kwargs)
# Task decorator
def async_task(sub):
return AsyncTask(sub)
Then, in settings.py:
CELERY_RESULT_BACKEND = 'myapp.somewhere.PubSubBackend'