How to route a chain of tasks to a specific queue in celery?
Asked Answered
B

3

20

When I route a task to a particular queue it works:

task.apply_async(queue='beetroot')

But if I create a chain:

chain = task | task

And then I write

chain.apply_async(queue='beetroot')

It seems to ignore the queue keyword and assigns to the default 'celery' queue.

It would be nice if celery supported routing in chains - all tasks executed sequentially in the same queue.

Bookstall answered 19/2, 2013 at 9:3 Comment(1)
Actually it works now on a fresh django (probably was fixed)Reason
B
12

Ok I got this one figured out.

You have to add the required execution options like queue= or countdown= to the subtask definition, or through a partial:

subtask definition:

from celery import subtask

chain = subtask('task', queue = 'beetroot') | subtask('task', queue = 'beetroot')

partial:

chain = task.s().apply_async(queue = 'beetroot') | task.s().apply_async(queue = 'beetroot')

Then you execute the chain through:

chain.apply_async()

or,

chain.delay()

And the tasks will be sent to the 'beetroot' queue. Extra execution arguments in this last command will not do anything. It would have been kind of nice to apply all of those execution arguments at the Chain (or Group, or any other Canvas primitives) level.

Bookstall answered 19/2, 2013 at 13:47 Comment(2)
Hmmm, that partial example didn't work for me, I got back the following error: TypeError: unsupported operand type(s) for |: 'AsyncResult' and 'AsyncResult' (using 3.0.23)Wickedness
I was having issues of my own in trying to get the chain to execute the second task. Question: If you're calling apply_async on both tasks, is that really a chain still? Won't both tasks execute of their own accord? I tried out your syntax and it failed because in my case the first subtask returns a value which is used by the second.Foreyard
C
21

I do it like this:

subtask = task.s(*myargs, **mykwargs).set(queue=myqueue)
mychain = celery.chain(subtask, subtask2, ...)
mychain.apply_async()
Copilot answered 10/6, 2014 at 9:49 Comment(2)
So it works if queue is specified on signature but not when it is passed to apply_async? do you know if there is some good documentation for this feature?Instar
Can different subtasks in the same chain be assigned different queues?Steinman
B
12

Ok I got this one figured out.

You have to add the required execution options like queue= or countdown= to the subtask definition, or through a partial:

subtask definition:

from celery import subtask

chain = subtask('task', queue = 'beetroot') | subtask('task', queue = 'beetroot')

partial:

chain = task.s().apply_async(queue = 'beetroot') | task.s().apply_async(queue = 'beetroot')

Then you execute the chain through:

chain.apply_async()

or,

chain.delay()

And the tasks will be sent to the 'beetroot' queue. Extra execution arguments in this last command will not do anything. It would have been kind of nice to apply all of those execution arguments at the Chain (or Group, or any other Canvas primitives) level.

Bookstall answered 19/2, 2013 at 13:47 Comment(2)
Hmmm, that partial example didn't work for me, I got back the following error: TypeError: unsupported operand type(s) for |: 'AsyncResult' and 'AsyncResult' (using 3.0.23)Wickedness
I was having issues of my own in trying to get the chain to execute the second task. Question: If you're calling apply_async on both tasks, is that really a chain still? Won't both tasks execute of their own accord? I tried out your syntax and it failed because in my case the first subtask returns a value which is used by the second.Foreyard
F
10

This is rather late, but I don't think the code provided by @mpaf is entirely correct.

Context: In my case, I have two subtasks, out of which the first provides a return value which is passed on to the second as the input argument. I was having trouble in getting the second task to execute - I saw in the logs that Celery would acknowledge the second task as a callback of the first, but it would never execute the second.

This was my non-working chain code -:

from celery import chain

chain(
    module.task1.s(arg),
    module.task2.s()
).apply_async(countdown=0.1, queue='queuename')

Using the syntax provided in @mpaf's answer, I got both tasks to execute, but the execution order was haphazard and the second subtask was not acknowledged as a callback of the first. I got the idea to browse the docs on how to explicitly set a queue on a subtask.

This is the working code -:

chain(
    module.task1.s(arg).set(queue='queuename'),
    module.task2.s().set(queue='queuename')
).apply_async(countdown=0.1)
Foreyard answered 30/7, 2016 at 13:24 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.