Getting all task IDs from nested chains and chords
Asked Answered
D

2

31

I'm using Celery 3.1.9 with a Redis backend. The job that I'm running is made of several subtasks which run in chords and chains. The structure looks like this:

  1. prepare
  2. download data (a chord of 2 workers)
  3. parse and store downloaded data
  4. long-running chord of 4 workers
  5. finalize
  6. generate report

Each item in the list is a subtask, they are all chained together. Steps 2 and 4 are chords. The whole thing is wired up by creating a chord for step 4 whose callback is a chain of 4 -> 6, then a chord is created for step 2, whose callback is 3 -> first chord. Then, finally a chain is created 1 -> second chord. This chain is then started with delay() and its ID is stored in the database.

The problem is two-fold. First I want to be able to revoke the whole thing and second I want to have a custom on_failure on my Task class that does some cleanup, and reports the failure to the user.

Currently I store the chain's task ID. I thought I could use this to revoke the chain. Also, in case of an error I wanted to walk the chain to its root (in the on_failure handler) to retrieve the relevant record from the database. This doesn't work, because when you re-create an instance of AsyncResult with just the ID of the task, its parent attribute is None.

The second thing I tried was to store the result of serializable() called on the outer chain's result. This however, does not return the entire tree of AsyncResult objects, it just returns the IDs of the first level in the chain (so not the IDs of the children in the chords.)

The third thing I tried was to implement serializable() myself, but as it turns out, the reason why the original method doesn't go further than 2 levels is because the chain's children are celery.canvas.chord objects, instead of AsyncResult instances.

An illustration of the problem:

chord([
    foo.si(),
    foo.si(),
    foo.si(),
], bar.si() | bar.si())
res = chord.apply_async()
pprint(res.serializable())

Prints the following:

(('50c9eb94-7a63-49dc-b491-6fce5fed3713',
  ('d95a82b7-c107-4a2c-81eb-296dc3fb88c3',
   [(('7c72310b-afc7-4010-9de4-e64cd9d30281', None), None),
    (('2cb80041-ff29-45fe-b40c-2781b17e59dd', None), None),
    (('e85ab83d-dd44-44b5-b79a-2bbf83c4332f', None), None)])),
 None)

The first ID is the ID of the callback chain, the second ID is from the chord task itself, and the last three are the actual tasks inside the chord. But I can't get at the result from the task inside the callback chain (i.e. the ID of the two bar.si() calls).

Is there any way to get at the actual task IDs?

Delbert answered 10/4, 2014 at 14:17 Comment(2)
No, it is impossible. The reason is that the nested chord is just a header inside the parent task, not actually a separate task inside the queue. When the parent finishes it schedules the other tasks.Delbert
Can you traverse the tasks on the chord after it's done? github.com/celery/celery/blob/master/celery/canvas.py#L1242 def _traverse_tasks(self, tasks, value=None):Eristic
W
1

One hacky way is calling the tasks with apply_async, save the task ids and wait for them manually. In this way you will have complete control of happens but you should only wait for async tasks as last resort. Now you can access task id, return value, etc. For example something like this:

 task1 = a_task.apply_async()
 task2 = b_task.apply_async()
 task3 = c_task.apply_async()

 tasks = [task1, task2, task3]

 for task in tasks:
     task.wait()
Wace answered 18/12, 2019 at 2:9 Comment(0)
W
0

I have a nested dag which is a mixture of groups and chains. The following recursive method works well to get the task_ids and their result:

import celery



def get_task_id_result_tuple_list(run_dag, with_result=True):
    
    task_id_result_list = []
    
    # for groups, parents are first task, then iterate over the children
    if isinstance(run_dag, celery.result.GroupResult):
        entry = (run_dag.parent, run_dag.parent.result) if with_result else run_dag.parent
        task_id_result_list.append(entry)
        children = run_dag.children
        for child in children:
            task_id_result_list.extend(get_task_id_result_tuple_list(child, with_result))

    # for AsyncResults, append parents in reverse
    elif isinstance(run_dag, celery.result.AsyncResult):
        ch = run_dag
        ch_list = [(ch, ch.result)] if with_result else [ch]
        while ch.parent is not None:
            ch = ch.parent
            entry = (ch, ch.result) if with_result else ch
            ch_list.append(entry)
        
        # remember to reverse the list to get the calling order
        task_id_result_list.extend(reversed(ch_list))
        
    return task_id_result_list
    
# dag is the nested celery structure of chains and groups
run_dag = dag.apply_async()

task_id_result_tuples = get_task_id_result_tuple_list(run_dag)
task_id_only = get_task_id_result_tuple_list(run_dag, False)

NOTE: I have not tested this with chords yet but I imagine it would either work as is or would maybe need another conditional branch to handle that.

Weaver answered 17/12, 2021 at 7:19 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.