Dynamically generate multiple tasks based on output dictionary from task in Airflow
Asked Answered
A

1

3

I have a task in which the output is a dictionary with a list value in each key

@task(task_id="gen_dict")
def generate_dict():
   ... 
   return output_dict # output look like this {"A" : ["aa","bb", "cc"], "B" : ["dd","ee", "ff"]}

# my dag (Not mention the part of generating DAG and its properties)
start = DummyOperator(task_id="st")
end = DummyOperator(task_id="ed")
output = generate_dict()
for keys, values in output.items():
   for v in values:
      dm = DummyOperator(task_id=f"dm_{keys}_{v}")
      dm >> end
        
start >> output

For this sample output above, it should create 6 dummy tasks which are dm_A_aa, dm_A_bb, dm_A_cc, dm_B_dd, dm_B_ee, dm_B_ff

But right now I'm facing the import error

AttributeError: 'XComArg' object has no attribute 'items'

Is it possible to do what I aim to do? If not, is it possible to do it using a list like ["aa", "bb", "cc", "dd", "ee", "ff"] instead?

Authenticate answered 25/3, 2022 at 13:21 Comment(0)
M
2

The code in the question won't work as-is because the loop shown would run when the dag is parsed (happens when the scheduler starts up and periodically thereafter), but the data that it would loop over is not known until the task that generates it is actually run.

There are ways to do something similar though.

AIP-42 added the ability to map list data into task kwargs in airflow 2.3:

@task
def generate_lists():
    # presumably the data below would come from a query executed at runtime
    return [["aa", "bb", "cc"], ["dd", "ee", "ff"]]

@task
def use_list(the_list):
    for item in the_list:
        print(item)

with DAG(...) as dag:
    use_list.expand(the_list=generate_lists())

The code above will create two tasks with output:

aa
bb
cc
dd
ee
ff

In 2.4 the expand_kwargs function was added. It's an alternative to expand (shown above) which operates on dicts instead.

It takes an XComArg referencing a list of dicts whose keys are the names of the arguments that you're mapping the data into. So the following code...

@task
def generate_dicts():
    # presumably the data below would come from a query made at runtime
    return [{"foo":6, "bar":7}, {"foo":8, "bar":9}]

@task
def two_things(foo, bar):
    print(foo, bar)

with DAG(...) as dag:
    two_things.expand_kwargs(generate_dicts())

... gives two tasks with output:

6 7

...and...

8 9

expand only lets you create tasks from the Cartesian product of the input lists, expand_kwargs lets you do the associating of data to kwargs at runtime.

Malaguena answered 27/3, 2022 at 20:35 Comment(2)
Should be the accepted answer. Amazing stuff. Thank you for linking the confluence task API-42 as well. It was exactly what I needed to see how to make this work with my DockerOperatorTemplated. Pretty amazing feeling when you start using a tool and that feature you need was just added!Matilda
Thanks for the feedback. Now that it's released I made some edits. You might be interested in the new stuff in 2.4 as well. I find expand_kwargs to be more flexible than expand.Malaguena

© 2022 - 2024 — McMap. All rights reserved.