Providing context in TriggerDagRunOperator
Asked Answered
B

3

7

I have a dag that has been triggered by another dag. I have passed through to this dag some configuration variables via the DagRunOrder().payload dictionary in the same way the official example has done.

Now in this dag I have another TriggerDagRunOperator to start a second dag and would like to pass those same configuration variables through.

I have succesfully accessed the payload variables in a PythonOperator like so:

def run_this_func(ds, **kwargs):
    print("Remotely received value of {} for message and {} for day".format(
        kwargs["dag_run"].conf["message"], kwargs["dag_run"].conf["day"])
    )

run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag
)

But the same pattern does not work in the TriggerDagRunOperator:

def trigger(context, dag_run_obj, **kwargs):
    dag_run_obj.payload = {
        "message": kwargs["dag_run"].conf["message"],
        "day": kwargs["dag_run"].conf["day"]
    }
    return dag_run_obj

trigger_step = TriggerDagRunOperator(
    task_id="trigger_modelling",
    trigger_dag_id="Dummy_Modelling",
    provide_context=True,
    python_callable=trigger,
    dag=dag
)

It yields a warning regarding the use of provide_context:

INFO - Subtask: /usr/local/lib/python2.7/dist-packages/airflow/models.py:1927: PendingDeprecationWarning: Invalid arguments were passed to TriggerDagRunOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
INFO - Subtask: *args: ()
INFO - Subtask: **kwargs: {'provide_context': True}
INFO - Subtask:   category=PendingDeprecationWarning

And this error suggesting I haven't passed the conf :

INFO - Subtask: Traceback (most recent call last):
INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
INFO - Subtask:     result = task_copy.execute(context=context)
INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/operators/dagrun_operator.py", line 64, in execute
INFO - Subtask:     dro = self.python_callable(context, dro)
INFO - Subtask:   File "/home/user/airflow/dags/dummy_responses.py", line 28, in trigger
INFO - Subtask:     "message": kwargs["dag_run"].conf["message"],
INFO - Subtask: KeyError: 'dag_run'

A second pattern that I've tried which also hasn't worked is using the params argument like so:

def trigger(context, dag_run_obj):
    dag_run_obj.payload = {
        "message": context['params']['message'],
        "day": context['params']['day']
    }
    return dag_run_obj

trigger_step = TriggerDagRunOperator(
    task_id="trigger_modelling",
    trigger_dag_id="Dummy_Modelling",
    python_callable=trigger,
    params={
        "message": "{{ dag_run.conf['message'] }}",
        "day": "{{ dag_run.conf['day'] }}"
    },
    dag=dag
)

This pattern does not yield an error but instead passes the parameters through to the next dag as strings ie it doesn't evaluate the expressions.


How can I access the configuration variables in the TriggerDagRunOperator of the second dag?

Bucky answered 12/2, 2018 at 12:31 Comment(0)
H
14

In Airflow2.0.x, the equivalent of @efbbrown's answer is:

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger_step = TriggerDagRunOperator(
    task_id="trigger_modelling",
    trigger_dag_id="Dummy_Modelling",
    conf={"message": "{{ dag_run.conf['message'] }}", "day":"{{ 
    dag_run.conf['day'] }}"},
    dag=dag
)

The pull request is described here on GitHub.

See the documentation for external-triggers and for trigger_dagrun.

Here is a YouTube video on the topic that shows the correct imports.

Hite answered 25/4, 2021 at 14:36 Comment(4)
Thankyou @Sawan VaidyaBucky
Hi, Is it not possible to create the dag_run_obj payload or the conf parameter dynamically? When triggering the TriggerDagRunOperator? This used to be possible before the python_callable parameter was depreciated in Airflow 2.0.Whitsun
@mathee, Yes, it appears that you can't do that using TriggerDagRunOperator. I solved that by adding an extra PythonOperator right before I called the TriggerDagRunOperator and I set up the dag_run_obj thereHite
@taari, I tried setting the dag_run_obj in python operator it didn't worked for me can share the snippet.Exuviae
B
7

Solved:

The dag_run object is stored in the context and so the configuration variables can be accessed in the python_callable of the TriggerDagRunOperator with this pattern:

def trigger(context, dag_run_obj):
    dag_run_obj.payload = {
        "message": context["dag_run"].conf["message"],
        "day": context["dag_run"].conf["day"]
    }
    return dag_run_obj

trigger_step = TriggerDagRunOperator(
    task_id="trigger_modelling",
    trigger_dag_id="Dummy_Modelling",
    python_callable=trigger,
    dag=dag
)
Bucky answered 12/2, 2018 at 14:5 Comment(6)
How did you end up reading the passed parameters in the triggered DAG? I need to do the same, and while I can confirm the parameter I want to pass is part of the dag run object's payload, I can't seem to "read" it in the triggered DAG.Lamia
I could access the parameters in my triggered dag with "{{ dag_run.conf['message'] }}" and "{{ dag_run.conf['day'] }}". This relies on the fields in the operator through which you are trying to read the parameters being template_fields. If the "{{ dag_run.conf['day'] }}" pattern doesn't work for you because the fields aren't template_fields, you will be able to extend the operator class which you are using to make those fields template_fields. Let me know if this doesn't make sense and I will include it in my answer.Bucky
Hmmm.. I generate a file in the first DAG, and then read the file and dynamically generate tasks based on the data in the file, in the triggered DAG. Currently, I have no good way of "sending" the filename to the DAG. I can make it part of the payload, but can't assign it to a variable in the triggered DAG. I had already tried the methods in the examples and the ones you've used too.Lamia
I programatically create the filename in the first DAG and then use the same logic in the triggered DAG to read it. It is dangerous and error-prone and all that, but till I can figure out a way to pass the filename to the triggered DAG, I have no option, it seems.Lamia
@efbbrown this solution is not working in Airflow v2.0.1; i'm getting this error: Invalid arguments were passed to TriggerDagRunOperator. Do you know how we could be passing context in TriggerDagRunOperator in Airflow version 2?Raleighraley
@Raleighraley no sorry I don't, I haven't yet used airflow 2.0. If you find a solution please add it as an answer :)Bucky
C
1

In addition to @taari 's answer, if you need arguments other than strings in your config, you can render the template as a native object. Also you can use params to add a default config (will be used in case you trigger without config, and will be displayed by default when you trigger with config). Here is an example using the taskflow syntax and the params argument.

from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.decorators import dag

dag_params = {
    "message":"Hello world", 
    "day":"Monday",
    "dict":{"option1":"something", "option2":"something else"}
}

@dag(
    dag_id="dummy",
    start_date=datetime(2023, 5, 9),
    schedule=None,
    params=dag_params,
    render_template_as_native_obj=True,
)
def dummmy():
    trigger_step = TriggerDagRunOperator(
        task_id="trigger_modelling",
        trigger_dag_id="Dummy_Modelling",
        conf={
            "message": "{{ params['message'] }}", 
            "day":"{{ params['day'] }}", 
            "dict":"{{ params['dict'] }}"
        })
dummy()
Camellia answered 9/5, 2023 at 13:57 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.