How to chain a Celery task that returns a list into a group?
Asked Answered
H

1

34

I want to create a group from a list returned by a Celery task, so that for each item in the task result set, one task will be added to the group.

Here's a simple code example to explain the use case. The ??? should be the result from the previous task.

@celery.task
def get_list(amount):
    # In reality, fetch a list of items from a db
    return [i for i in range(amount)]

@celery.task
def process_item(item):
    #do stuff
    pass

process_list = (get_list.s(10) | group(process_item.s(i) for i in ???))

I'm probably not approaching this correctly, but I'm pretty sure it's not safe to call tasks from within tasks:

@celery.task
def process_list():
    for i in get_list.delay().get():
        process_item.delay(i)

I don't need the result from the seconds task.

Hamza answered 7/11, 2012 at 13:45 Comment(1)
Indeed, do not call a task from a task. This will cause deadlocks. Say you have one worker. You call your task, which ties up worker 1, then calls a second task. There's no worker to process that task and everything will hang. This nastiness gets slightly better as you add workers, but you'll always be tying up multiple workers with a single task (and losing parallelism).Annapurna
D
51

You can get this kind of behavior using an intermediate task. Here's a demonstration of creating a "map" like method that works like you've suggested.

from celery import task, subtask, group

@task
def get_list(amount):
    return [i for i in range(amount)]

@task
def process_item(item):
    # do stuff
    pass

@task
def dmap(it, callback):
    # Map a callback over an iterator and return as a group
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()

# runs process_item for each item in the return of get_list 
process_list = (get_list.s(10) | dmap.s(process_item.s()))

Credit to Ask Solem for giving me this suggestion when I asked him for help on a similar issue.

Dygert answered 26/11, 2012 at 17:20 Comment(8)
Note that clone only does a shallow copy. If you want to clone a "complex" signature (like a chain, group or chord), you will need to either (ab)use python's deepcopy, as mentioned in celery issue 2251. Or you move callback = subtask(callback) into the for-loop creating the functions and delete the clone.Shevlo
I've read the above comment about a dozen times and I don't get it. Could you provide an example, @LuisNell?Annapurna
@Annapurna Given the above code, what I mean is the following. If we assume "callback" is not simply a single task, but rather a complex workflow (a group or a chord), you can't simply use .clone(). Groups and chords might be very complex (a group of groups etc.). In that case you can't simply use .clone, because that only creates a shallow copy of your callback signature. This means that arguments won't be passed on correctly. To make sure everything works as expected, you need to use deepcopy, as mentioned in my original comment – does that make it more clear? if not, i'll try again.Shevlo
If I understand this correctly, then the dmap function will execute the group synchronously, so two tasks for through the broker, whereas normally the group() would cause the individual process_item functions to be called in parallel. If that's right, is there any difference to return [process_item(i) for i in it]?Rabassa
Nit pick: isn't [i for i in range(n)] equivalent to just range(n)? And should list [arg,] be tuple (arg,)?Calorific
I've tried to do a two level version of this and it's not working. I've opened a new question at https://mcmap.net/q/451677/-how-to-recursively-chain-a-celery-task-that-returns-a-list-into-a-group/3189 - any insights appreciated.Calenture
The given answer works for me, but i need to add one more task after process_item https://mcmap.net/q/451678/-celery-chain-task-on-group. any pointers ?Andreasandree
The list [arg, ] should be changed to a tuple (arg, ) . Otherwise you would not be able to pass additional arguments to process_item(item, add_argum1, add_argum2) which you might really need to process your data. Then you could put process_list = (get_list.s(10) | dmap.s(process_item.s(additional_argument1, additional_argument2)))Militate

© 2022 - 2024 — McMap. All rights reserved.