Like in this other question, I want to create a celery group from a list that's returned by a celery task. The idea is that the first task will return a list, and the second task will explode that list into concurrent tasks for every item in the list.
The plan is to use this while downloading content. The first task gets links from a website, and the second task is a chain that downloads the page, processes it, and then uploads it to s3. Finally, once all the subpages are done, the website is marked as done in our DB. Something like:
chain(
get_links_from_website.si('https://www.google.com'),
dmap.s( # <-- Distributed map
download_sub_page.s() |
process_sub_page.s() |
upload_sub_page_to_s3.s()
),
mark_website_done.s()
)
The solution I've seen so far seems to do an adequate job of this, but fails when the second task is a chain, due to issues with clone
not doing a deepcopy (see the comments on this answer for details):
@task
def dmap(it, callback):
# Map a callback over an iterator and return as a group
callback = subtask(callback)
return group(callback.clone([arg,]) for arg in it)()
It also has the problem that if the iterable is 10,000 items long, it will create a group with 10,000 items. That is blowing up our memory usage, as you can imagine.
So, what I'm looking for is a way to do dmap
that:
- Doesn't blow up RAM by creating monstrous groups (maybe there's a way to chunk through the iterable?)
- Works on celery chains without issues with deepcopy.