I have a dag that has been triggered by another dag. I have passed through to this dag some configuration variables via the DagRunOrder().payload
dictionary in the same way the official example has done.
Now in this dag I have another TriggerDagRunOperator
to start a second dag and would like to pass those same configuration variables through.
I have succesfully accessed the payload variables in a PythonOperator
like so:
def run_this_func(ds, **kwargs):
print("Remotely received value of {} for message and {} for day".format(
kwargs["dag_run"].conf["message"], kwargs["dag_run"].conf["day"])
)
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag
)
But the same pattern does not work in the TriggerDagRunOperator
:
def trigger(context, dag_run_obj, **kwargs):
dag_run_obj.payload = {
"message": kwargs["dag_run"].conf["message"],
"day": kwargs["dag_run"].conf["day"]
}
return dag_run_obj
trigger_step = TriggerDagRunOperator(
task_id="trigger_modelling",
trigger_dag_id="Dummy_Modelling",
provide_context=True,
python_callable=trigger,
dag=dag
)
It yields a warning regarding the use of provide_context
:
INFO - Subtask: /usr/local/lib/python2.7/dist-packages/airflow/models.py:1927: PendingDeprecationWarning: Invalid arguments were passed to TriggerDagRunOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
INFO - Subtask: *args: ()
INFO - Subtask: **kwargs: {'provide_context': True}
INFO - Subtask: category=PendingDeprecationWarning
And this error suggesting I haven't passed the conf :
INFO - Subtask: Traceback (most recent call last):
INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
INFO - Subtask: result = task_copy.execute(context=context)
INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/operators/dagrun_operator.py", line 64, in execute
INFO - Subtask: dro = self.python_callable(context, dro)
INFO - Subtask: File "/home/user/airflow/dags/dummy_responses.py", line 28, in trigger
INFO - Subtask: "message": kwargs["dag_run"].conf["message"],
INFO - Subtask: KeyError: 'dag_run'
A second pattern that I've tried which also hasn't worked is using the params
argument like so:
def trigger(context, dag_run_obj):
dag_run_obj.payload = {
"message": context['params']['message'],
"day": context['params']['day']
}
return dag_run_obj
trigger_step = TriggerDagRunOperator(
task_id="trigger_modelling",
trigger_dag_id="Dummy_Modelling",
python_callable=trigger,
params={
"message": "{{ dag_run.conf['message'] }}",
"day": "{{ dag_run.conf['day'] }}"
},
dag=dag
)
This pattern does not yield an error but instead passes the parameters through to the next dag as strings ie it doesn't evaluate the expressions.
How can I access the configuration variables in the TriggerDagRunOperator
of the second dag?