How should I implement callback for taskset in celery
Asked Answered
A

1

6

Question

I use celery to launch task sets that look like this:

  1. I perform a batch of tasks that can be run in parallel, number of tasks in this batch varies from tens to couple thousands.
  2. I aggregate results of these tasks into single answer, then do something with this answer --- like store to the database, save to special result file and so on. Basically after tasks done executing I have to call function that has following signature:

    def callback(result_file_name, task_result_list): 
        #store in file
    
    
    def callback(entity_key, task_result_list):
        #store in db 
    

For now step 1. is done in Celery queue and step 2 is done outside celery:

    tasks = []

    # add taksks to tasks list 

    task_group = group()
    task_group.tasks = tasks

    result = task_group.apply_async()

    res = result.join()

    # Aggregate results 

    # Save results to file, database whatever

This approach is cumbersome since I have to stop a single thread until all tasks are performed (which can take couple of hours).

I would like to somehow move step 2 to celery also --- esentially I would need to add a callback to entire taskset (as far as I know it is unsupported in Celery) or submit a task that is executed after all these subtasks.

Does anyone have idea how to do it? I use it in the django enviorment so I can store some state in the database.

To sum up my recent findings

Chords won't do

I'cant use chords straight forwardly because chords enable me to create callbacks that look this way:

    def callback(task_result_list): 
        #store in file

there is no obvious way to pass additional parameters to callback (especially because these callbacks can't be local functions).

Using the database either

I can store results using TaskSetMeta but this entity has no status field --- so even if I would add a signal to TaskSetMeta i'd have to pool task results which could have siginificant overhead.

Allometry answered 26/5, 2012 at 15:51 Comment(2)
Have you tried chords?Affricative
I looked into chords and I think they won't do. I'd like to store results in the database or file or whatever. In which case chord would need to be passed two arguments first is report file name or some details of task entity second parameter would be concatenated args. As of today Celery chords take only list of results. Or I am wrong?Allometry
A
3

Well answer was really straightforward, and I can indeed use chords --- and additional parameters (like report file name and so on) must be passed as kwargs.

Here is chord task:

@task
def print_and_sum(to_sum, file_name):
    print file_name
    print sum(to_sum)
    return file_name, sum(to_sum)

Here is how to instantiate it:

subtasks = [...]
result = chord(subtasks)(print_and_sum.subtask(kwargs={'file_name' : 'report_file.csv'}))
Allometry answered 22/6, 2012 at 13:41 Comment(2)
Note - result = chord(subtasks)(print_and_sum.s(kwargs = {'file_name' : 'report_file.csv'})) does not work. It says that kwargs is a unidentified argument. Works well when using substask instead of s. Don't know the reason.Sclerenchyma
Thanks @Siddharth. Both question and answer are preety aged, and I'm not really using cleery now. Feel free to post your findings as an another answer as it moight be usefull to others.Allometry

© 2022 - 2024 — McMap. All rights reserved.