I have a situation similar to the one outlined here, except that instead of chaining tasks with multiple arguments, I want to chain tasks that return a dictionary with multiple entries.
This is -- very loosely and abstractly --- what I'm trying to do:
tasks.py
@task()
def task1(item1=None, item2=None):
item3 = #do some stuff with item1 and item2 to yield item3
return_object = dict(item1=item1, item2=item2, item3=item3)
return return_object
def task2(item1=None, item2=None, item3=None):
item4 = #do something with item1, item2, item3 to yield item4
return_object = dict(item1=item1, item2=item2, item3=item3, item4=item4)
return return_object
Working from ipython, I am able to call task1 individually and asynchronously, with no problems.
I can ALSO call task2 individually with the result returned by task1 as a double-star argument:
>>res1 = task1.s(item1=something, item2=something_else).apply_async()
>>res1.status
'SUCCESS'
>>res2 = task2.s(**res1.result).apply_async()
>>res2.status
'SUCCESS
However, what I ultimately want achieve is the same end result as above, but via a chain, and here, I can't figure out how to have task2 instantiated not with (positional) arguments returned by task1, but with task1.result as **kwargs:
chain_result = (task1.s(item1=something, item2=something_else) | task2.s()).apply_async() #THIS DOESN'T WORK!
I suspect that I can go back and rewrite my tasks so that they return positional arguments instead of a dictionary, and this may clear things up, but it seems to me that there should be some way to access task1's return object in task2 with the equivalent functionality of the **double star. I also suspect that I'm missing something here fairly obvious about either Celery subtask implementation or *args vs. **kwargs.
Hope this makes sense. And thanks in advance for any tips.