I'm using Celery 3.1.9 with a Redis backend. The job that I'm running is made of several subtasks which run in chords and chains. The structure looks like this:
- prepare
- download data (a chord of 2 workers)
- parse and store downloaded data
- long-running chord of 4 workers
- finalize
- generate report
Each item in the list is a subtask, they are all chained together. Steps 2 and 4 are chords. The whole thing is wired up by creating a chord for step 4 whose callback is a chain of 4 -> 6, then a chord is created for step 2, whose callback is 3 -> first chord. Then, finally a chain is created 1 -> second chord. This chain is then started with delay() and its ID is stored in the database.
The problem is two-fold. First I want to be able to revoke the whole thing and second I want to have a custom on_failure on my Task
class that does some cleanup, and reports the failure to the user.
Currently I store the chain's task ID. I thought I could use this to revoke the chain. Also, in case of an error I wanted to walk the chain to its root (in the on_failure
handler) to retrieve the relevant record from the database. This doesn't work, because when you re-create an instance of AsyncResult
with just the ID of the task, its parent attribute is None
.
The second thing I tried was to store the result of serializable()
called on the outer chain's result. This however, does not return the entire tree of AsyncResult
objects, it just returns the IDs of the first level in the chain (so not the IDs of the children in the chords.)
The third thing I tried was to implement serializable()
myself, but as it turns out, the reason why the original method doesn't go further than 2 levels is because the chain's children are celery.canvas.chord
objects, instead of AsyncResult
instances.
An illustration of the problem:
chord([
foo.si(),
foo.si(),
foo.si(),
], bar.si() | bar.si())
res = chord.apply_async()
pprint(res.serializable())
Prints the following:
(('50c9eb94-7a63-49dc-b491-6fce5fed3713',
('d95a82b7-c107-4a2c-81eb-296dc3fb88c3',
[(('7c72310b-afc7-4010-9de4-e64cd9d30281', None), None),
(('2cb80041-ff29-45fe-b40c-2781b17e59dd', None), None),
(('e85ab83d-dd44-44b5-b79a-2bbf83c4332f', None), None)])),
None)
The first ID is the ID of the callback chain, the second ID is from the chord task itself, and the last three are the actual tasks inside the chord. But I can't get at the result from the task inside the callback chain (i.e. the ID of the two bar.si() calls).
Is there any way to get at the actual task IDs?
def _traverse_tasks(self, tasks, value=None):
– Eristic