How do we trigger multiple airflow dags using TriggerDagRunOperator?
Asked Answered
R

5

9

I have a scenario wherein a particular dag upon completion needs to trigger multiple dags,have used TriggerDagRunOperator to trigger single dag,is it possible to pass multiple dags to the TriggerDagRunOperator to trigger multiple dags?

And is it possible to trigger only upon successful completion of the current dag.

Riancho answered 28/6, 2017 at 15:34 Comment(0)
G
15

I have faced the same problem. And there is no solution out of the box, but we can write a custom operator for it.

So here the code of a custom operator, that get python_callable and trigger_dag_id as arguments:

class TriggerMultiDagRunOperator(TriggerDagRunOperator):

    @apply_defaults
    def __init__(self, op_args=None, op_kwargs=None, *args, **kwargs):
        super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
        self.op_args = op_args or []
        self.op_kwargs = op_kwargs or {}

    def execute(self, context):
        session = settings.Session()
        created = False
        for dro in self.python_callable(context, *self.op_args, **self.op_kwargs):
            if not dro or not isinstance(dro, DagRunOrder):
                break

            if dro.run_id is None:
                dro.run_id = 'trig__' + datetime.utcnow().isoformat()

            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                state=State.RUNNING,
                conf=dro.payload,
                external_trigger=True
            )
            created = True
            self.log.info("Creating DagRun %s", dr)

        if created is True:
            session.commit()
        else:
            self.log.info("No DagRun created")
        session.close()

trigger_dag_id is dag id what we want running multiple times.

python_callable is a function, it should return a list of DagRunOrder objects, one object for schedule one instance of DAG with dag_id trigger_dag_id.

Code and examples on GitHub: https://github.com/mastak/airflow_multi_dagrun Little bit more description about this code: https://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13

Grisham answered 10/8, 2018 at 16:35 Comment(2)
Even I haven't been able to find an out of the box solution for this,therefore your answer seems to be the solution for this use case.Riancho
Is it possible to trigger the dags sequentially using this operator ?Ypsilanti
A
2

In Airflow 2, you can do a dynamic task mapping. For example:

import uuid
import random
from airflow.decorators import dag, task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator


dag_args = {
    "start_date": datetime(2022, 9, 9),
    "schedule_interval": None,
    "catchup": False,
}

@task
def define_runs():
    num_runs = random.randint(3, 5)
    runs = [str(uuid.uuid4()) for _ in range(num_runs)]
    return runs


@dag(**dag_args)
def dynamic_tasks():

    runs = define_runs()
    run_dags = TriggerDagRunOperator.partial(
        task_id="run_dags",
        trigger_dag_id="hello_world",
        conf=None,
    ).expand(
        trigger_run_id=runs,
    )

    run_dags

dag = dynamic_tasks()

Docs here.

Allotrope answered 29/8, 2022 at 20:53 Comment(2)
Did you run this code? I tried this on my local machine with airflow 2.3.0 and it's failingAtheism
I've updated the answer, try now.Allotrope
K
1

You can try looping it! for example:

for i in list:

trigger_dag =TriggerDagRunOperator(task_id='trigger_'+ i, 
                                trigger_dag_id=i,
                                python_callable=conditionally_trigger_non_indr,
                                dag=dag)

Set this dependent on the task that is required. I have automated something like this for PythonOperator. You could try if this works for you!

Kokura answered 29/6, 2017 at 7:12 Comment(2)
Ok.will try this out,can we modify the python_callable - "conditionally_trigger" function to return the dagrun object only on successful completion of all tasks in the dag?Riancho
Why would you need that? You can make trigger_dag task wait for all tasks to complete and then run on completion of all tasks. In the sense that you should set your dependency such that it runs after all tasks.Kokura
E
0

As the API docs state, the method accepts a single dag_id. However, if you want to unconditionally kick off downstream DAGs upon completion, why not just put those tasks in a single DAG and set your dependencies/workflow there? You would then be able to set depends_on_past=True where appropriate.

EDIT: Easy workaround if you absolutely need them in separate DAGs is to create multiple TriggerDagRunOperators and set their dependencies to the same task.

Exteroceptor answered 28/6, 2017 at 15:45 Comment(3)
Just curious,can't we use a bashoperator to execute cmd "airflow trigger_dag <<dag_id>> instead of using a triggerdagrunoperator,how would both these methods differ?Riancho
There are all sorts of workarounds, but I think you need to properly evaluate your business logic. If your only dependency is a simple completion, then it would be more appropriate to structure these processes as tasks within a DAG and not as a DAG. Is there a reason you want to split them out into DAGs?Exteroceptor
We need to have it in a separate dag since it needs to have a different start_date(time to be precise).Riancho
G
0

Expanding on https://stackoverflow.com/users/14647868/matias-lopez reply. If you need dynamic paylod:

For example:

run_dags = TriggerDagRunOperator.partial(
    task_id='test_07_few_opt_ins_triggered_dag',
    trigger_dag_id='test_07_few_opt_ins_triggered_dag',
).expand(
    conf=[{"line": "1"}, {"line": "2"}, {"line": "3"}]
)

Above we have 3 runs, and we need to set the expand filling the conf with the same number of "runs".

Then, in the triggered DAG:

@task
def start(dag_run=None):
    print(f"consuming line {dag_run.conf.get('line')}")

start()
Glanti answered 13/7, 2023 at 11:20 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.