how to implement airflow DAG in a loop
Asked Answered
L

2

3

I just started with Airflow. I want to set up a DAG in a loop, where the next DAG starts when the previous DAG is completed. Here is the work flow that I want to achieve:

list_of_files = [......]
for file in list_of_files:
   dag = DAG('pipeline', default_args=default_args, schedule_interval=None)
   t1 = BashOperator('copy_this_file', ....)
   t2 = BashOperator('process_this_file', ...)
   t1.set_downstream(t2)

If I run airflow backfill pipeline -s 2019-05-01, all the DAGs are started simultaneously.

Lovettalovich answered 3/5, 2019 at 16:15 Comment(0)
O
3

DAGs can't depend on each other, they are separate workflows. You want to configure tasks to depend on each other instead. You can have a single DAG with multiple execution branches, one for each file, something like this (not tested):

dag = DAG('pipeline', ...)
list_of_files = [......]
with dag:
    for file in list_of_files:
       t1 = BashOperator('copy_this_file', ....)
       t2 = BashOperator('process_this_file', ...)
       t1.set_downstream(t2)
Outland answered 6/5, 2019 at 21:19 Comment(0)
H
1

Even though DAGs are separate workflows, you can use the TriggerDagRunOperator. You just need the name dag id of the next DAG, so you need to edit your loop do look ahead or just use an index or something. Easy fix. Example of the operator below:

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

list_of_files = [......]
for file in list_of_files:
   dag = DAG('pipeline', default_args=default_args, schedule_interval=None)
   t1 = BashOperator('copy_this_file', ....)
   t2 = BashOperator('process_this_file', ...)
   trigger_run_task = TriggerDagRunOperator(
       task_id="next_dag_trigger_run",
       trigger_dag_id="next_dag_id",
   )
   t1 >> t2 >> trigger_run_task
Howells answered 24/4 at 13:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.