How to dynamically generate airflow tasks in a loop and run them parallelly?
Asked Answered
S

2

6

I have a use case in which, I am downloading some json files and parsing them. Depending on the files that are downloaded, the program needs to populate data in different tables. Once the data is loaded in the tables, an email notification must be sent.

For example, if the program needs to populate tables a and b (obtained from table_list), then the workflow should look like download >> parse >> [load_table_a, load_table_b] >> send_email

if tables a, b, c, d are obtained from table_list, then the workflow should look like download >> parse >> [load_table_a, load_table_b, load_table_c, load_table_d] >> send_email

Here is what I am trying. Can someone please help out.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator

from datetime import datetime

from download_script import download
from parse_script import parse
from load_2_sf_script import get_table_list, insert_into_sf
from airflow.utils.email import send_email_smtp


default_args = {
    'start_date': datetime(2021, 5, 18)
}


with DAG(
    'Test DAG',
    default_args = default_args,
    catchup = False
) as dag:

    download = PythonOperator(
        task_id = 'download',
        python_callable = download,
        email_on_failure = True,
        email = '[email protected]'
    )

    parse = PythonOperator(
        task_id = 'parse',
        python_callable = parse,
        email_on_failure = True,
        email = '[email protected]'
    )


    table_list = get_table_list()
    task_list = []
    for table in table_list:
        task_list.append(
            PythonOperator(
                task_id = 'load_table_{}'.format(table),
                python_callable = insert_into_sf,
                email_on_failure = True,
                email = '[email protected]',
                op_kwargs = {'category': table}
            )
        )


    send_email = EmailOperator(
        task_id = 'send_email',
        to = ['[email protected]'],
        subject = 'Airflow: Success',
        html_content = 'Dag run completed succesfully.'
    )

    download >> parse >> [task for task in task_list] >> send_email

    
Schramke answered 28/5, 2021 at 22:25 Comment(1)
get_table_list will be evaluated every time DAG is parsed so if it's a heavy process (db query, requests etc) then it will impact Airflow scheduler performance. In such cases it's recommend to consider some form of lazy evaluation.Sunfish
S
8

If this is what you expect: enter image description here

Then this will work:

with DAG(
    'medical_device',
    default_args=default_args,
    catchup=False
) as dag:
    download_task = PythonOperator(
        task_id='download_task',
        python_callable=download,
        email_on_failure=True,
        email='[email protected]'
    )

    parse_task = PythonOperator(
        task_id='parse_task',
        python_callable=parse,
        email_on_failure=True,
        email='[email protected]'
    )


    send_email = EmailOperator(
        task_id='send_email',
        to=['[email protected]'],
        subject='Airflow: Success',
        html_content='Dag run completed succesfully.'
    )

    download_task >> parse_task

    table_list = get_table_list()
    for table in table_list:
        op = PythonOperator(
                task_id='load_table_{}'.format(table),
                python_callable=insert_into_sf,
                email_on_failure=True,
                email='[email protected]',
                op_kwargs={'category': table}
            )
        parse_task >> op >> send_email

You don't need to construct the list, you can set the upstream and downstream relations dynamically in the for loop using parse_task >> op >> send_email.

Tip: try to keep your task_id inline with names of variables of the tasks, that's not necessary but is a good practice.

Sunfish answered 29/5, 2021 at 7:34 Comment(3)
Thanks, this works exactly as intended. However, there is one new issue. Since the DAG is changing every time it runs, the graph view for previous tasks is also getting updated. Is there a way to retain the shape of precious DAG runs?Schramke
@Schramke unfortunately it's not possible as Airflow does not support natively DAG versioning. If versioning the DAG is crucial for you then take a look at that question.Sunfish
Is it possible using dynamic task mapping?Minervamines
N
0

I'm trying to implement something very similar, but we use the pattern where dag=DAG(...) and tasks defined at the top level with dag injected with dag=dag. A PythonOperator task calls a function that contains the for loop. It executes, but the dynamic tasks never run. What is there some nuance to getting it to work using this pattern? Pseudo code:

dag = DAG(
    ...
)

def polling_fn(ti, **kwargs):
    for ec2Id in ['a', 'b','c']:
        stop_instances_task = EC2StopInstanceOperator(task_id=f"stop_instances_{ec2Id}", aws_conn_id='aws_default', instance_id=ec2Id, dag=dag)
        polling_task >> stop_instances_task >> end_task

load_variables_task = PythonOperator(
        task_id="load_variables",
        python_callable=load_variables_fn,
        provide_context=True
)

start_task = DummyOperator(  
    task_id='start',  
    dag=dag,  
)

polling_task = PythonOperator(
    task_id='polling_task',
    python_callable=polling_fn,
    dag=dag,
)

end_task = DummyOperator(  
    task_id='done',  
    dag=dag,  
) 

start_task >> polling_task >> end_task
Nuclear answered 4/1 at 16:21 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.