Run multiple Celery tasks using a topic exchange
Asked Answered
T

2

5

I'm replacing some homegrown code with Celery, but having a hard time replicating the current behaviour. My desired behaviour is as follows:

  • When creating a new user, a message should be published to the tasks exchange with the user.created routing key.
  • Two Celery tasks should be trigged by this message, namely send_user_activate_email and check_spam.

I tried implementing this by defining a user_created task with a ignore_result=True argument, plus a task for send_user_activate_email and check_spam.

In my configuration, I added the following routes and queues definitions. While the message is delivered to the user_created queue, it is not delivered to the other two queues.

Ideally, the message is only delivery to the send_user_activate_email and check_spam queues. When using vanilla RabbitMQ, messages are published to an exchange, to which queues can bind, but Celery seems to deliver a message to a queue directly.

How would I implement the behaviour outlined above in Celery?

CELERY_QUEUES = {
    'user_created': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
    'send_user_activate_email': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
    'check_spam': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
}

CELERY_ROUTES = {
    'user_created': {
        'queue': 'user_created',
        'routing_key': 'user.created',
        'exchange': 'tasks',
        'exchange_type': 'topic',
    },
    'send_user_activate_email': {
        'queue': 'user_created',
        'routing_key': 'user.created',
        'exchange': 'tasks',
        'exchange_type': 'topic',
    },
    'check_spam': {
        'queue': 'user_created',
        'routing_key': 'user.created',
        'exchange': 'tasks',
        'exchange_type': 'topic',
    },
}
Twylatwyman answered 1/4, 2016 at 7:43 Comment(0)
F
4

It sounds like you are expecting a single message to trigger/be consumed by two queues but this is not how Celery works. An Exchange will post a task to eligible queues, but once it is consumed, the other Queues ignore the message. You need a message per Task you want to trigger.

There is often confusion with new Celery users because there are two uses of "Queue" in this system; Kombu Queues which the Queue() and documentation refer to, and the AMQP Queues, which hold messages directly and are consumed by workers. When we publish to queues, we think of the AMQP ones, which is incorrect. (thanks to answer linked below).

Back to your issue, if I am understanding correctly, when user_created is consumed, you want it to spawn two more tasks; send_user_activate_email and check_spam. Furthermore, these should not be dependent on each other; they can run in parallel on separate machines and do not need to know the status of one another.

In this case, you want user_created to "apply_async" these two new Tasks and return. This could be done directly, or you can use a Celery "Group" containing check_spam and send_user_activate_email to achieve this. The group gives some nice shorthand and lends some structure to your tasks, so personally I'd nudge you that direction.

#pseudocode
group(check_spam.s(... checkspam kwargs ...), send_user_activate_email.s(... active email kwargs ...)).apply_async()

This setup would create four messages; one for each Task you want to execute plus one for the Group(), which itself will have a result.

In your case, I am not sure the Exchange or ignore_result is necessary, but I'd need to see the Task code and understand the system more to make that judgement.

http://docs.celeryproject.org/en/latest/userguide/canvas.html#groups http://celery.readthedocs.org/en/v2.2.6/userguide/routing.html#exchanges-queues-and-routing-keys Why do CELERY_ROUTES have both a "queue" and a "routing_key"?

(if I am way off I'll delete/remove the answer...)

Factoring answered 10/4, 2016 at 15:49 Comment(2)
Thanks for your elaborate explanation. Gathering from your answer and the docs, Celery uses the routing_key for distributing tasks across workers, rather than having multiple tasks response to a single message. This basically forces you to tightly couple the code that triggers a task and processes the task. Is this correct?Twylatwyman
@joelcox, I think that's a great summary. The exceptions to this rule are Map() and Starmap(), which I believe execute a task for each element in a sequence, but only send a single message. If you want tasks to respond to one another (say, waiting for another to succeed as it needs metadata to continue), you can also look into Chain(), Chord().Factoring
L
0

The easy way to dessign and resolve your problem is usign Celery workflows.
But first of all I'd change your queue definition, setting a unique routing key per task and exchange_type with 'direct' value.

According with celery documentation, Direct exchanges match by exact routing keys, so we set the same exchange to all custom tasks and consumer queues and we map routing_key (for tasks) and binding_key (for queues) like the next snippet:

CELERY_QUEUES = {
    'user_created': {'binding_key':'user_created', 'exchange': 'tasks', 'exchange_type': 'direct'},
    'send_user_activate_email': {'binding_key':'send_user_activate_email', 'exchange': 'tasks', 'exchange_type': 'direct'},
    'check_spam': {'binding_key':'check_spam', 'exchange': 'tasks', 'exchange_type': 'direct'},
}

CELERY_ROUTES = {
    'user_created': {
        'queue': 'user_created',
        'routing_key': 'user_created',
        'exchange': 'tasks',
        'exchange_type': 'direct',
    },
    'send_user_activate_email': {
        'queue': 'send_user_activate_email',
        'routing_key': 'send_user_activate_email',
        'exchange': 'tasks',
        'exchange_type': 'direct',
    },
    'check_spam': {
        'queue': 'check_spam',
        'routing_key': 'check_spam',
        'exchange': 'tasks',
        'exchange_type': 'direct',
    },
}

Once this change is done, you need to use the proper workflow for the available list (http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives). Reading your problem I think you need a chain, because order is needed to be preserved.

sequential_tasks = []
sequential_tasks.append(user_created.s(**user_created_kwargs))
sequential_tasks.append(send_user_activate_email.s(**send_user_activate_email_kwargs))
sequential_tasks.append(check_spam.s(**check_spam_kwargs))
#you can add more tasks to the chain
chain(*sequential_tasks)()

Celery will handle queue-related-work transparently.

Lenient answered 12/4, 2016 at 9:26 Comment(1)
Could you explain why I would need separate exchange for each task? The send_user_activate_email and check_spam tasks can run in parallel, if that matters.Twylatwyman

© 2022 - 2024 — McMap. All rights reserved.