Why does Celery only run the first task in the chain?
Asked Answered
G

2

6

I have a working chain, but when I add apply_async() it only executes the first task.

@task(name='run_a', delay=True)
def run_a(**kwargs):
    do_whatever(kwarg['var'])
    return

@task(name='run_b', delay=True)
def run_b(**kwargs):
    # ...
    return

@task(name='run_c', delay=True)
def run_c(**kwargs):
    # ...
    return

With a chain command:

ret = chain(
    run_a.s(**kwargs),
    run_b.s(**kwargs),
    run_b.s(**kwargs)
).apply_async()
  • Without the apply_async it all works (synchronously) as expected.
  • 'kwargs' is a dict.
Gnathic answered 9/4, 2016 at 8:39 Comment(2)
Can somebody confirm it's related to using the kwargs dict? #14968765Gnathic
When getting to run_b I get a typeError in the Celery Worker log: TypeError: run_b() takes exactly 0 arguments (8 given)Gnathic
T
3

Base on the document at http://docs.celeryproject.org/en/master/userguide/canvas.html#chains : The linked task will be applied with the result of its parent task as the first argument. . So to force the next linked task not to use parent result as argument, we have to make the task immutable by using the .si() shortcut. So we have to re-write the chain as follow

In [29]: ret = chain(
    ...:     run_a.si(**kwargs),
    ...:     run_b.si(**kwargs),
    ...:     run_c.si(**kwargs)
    ...: ).apply_async()

Result

In [30]: print ret.parent.parent.graph
0e1541f8-93c2-48c9-95b0-7a0a5971d74a(1)
     7b5e11e4-6ccf-49cc-a1dd-42bf407a37de(0)
7b5e11e4-6ccf-49cc-a1dd-42bf407a37de(0)
70a6e66c-1ef9-4814-ae23-9c905ee1fcd5(2)
     0e1541f8-93c2-48c9-95b0-7a0a5971d74a(1)
          7b5e11e4-6ccf-49cc-a1dd-42bf407a37de(0)
Thadeus answered 10/7, 2018 at 10:29 Comment(0)
G
0

While Celery validates your tasks before execution, for a func to work *args and **kwargs are expected.

# Kwargs was filled, I added an empty args list
args = []
kwargs = {
    'some': 'intelligent data',
    }

When calling the functions with both, it works as expected:

ret = chain(
    run_a.s(*args, **kwargs),
    run_b.s(*args, **kwargs),
    run_b.s(*args, **kwargs)
).apply_async()
Gnathic answered 9/4, 2016 at 14:39 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.