Airflow Python operator passing parameters
Asked Answered
M

3

11

I'm trying to write a Python operator in an airflow DAG and pass certain parameters to the Python callable.

My code looks like below.

def my_sleeping_function(threshold):
   print(threshold)

fmfdependency = PythonOperator(
   task_id='poke_check',
   python_callable=my_sleeping_function,
   provide_context=True,
   op_kwargs={'threshold': 100},
   dag=dag)

end = BatchEndOperator(
   queue=QUEUE,
   dag=dag)

start.set_downstream(fmfdependency)
fmfdependency.set_downstream(end)

But I keep getting the below error.

TypeError: my_sleeping_function() got an unexpected keyword argument 'dag_run'

Not able to figure out why.

Moot answered 15/2, 2019 at 21:21 Comment(2)
Please format your code into correct form. It's hard to understand where my_sleeping_function returns. What is PythonOperator?Tedford
Apparently related previous QA-threadSuffix
P
11

Add **kwargs to your operator parameters list after your threshold param

Paleography answered 16/2, 2019 at 0:50 Comment(4)
You mean like op_kwargs={'threshold': 100, **kwargs}? I have no kwargs var to expand like that.Astral
Even if you don’t have args that you created for the function, The PythonOperator will append a set of parameters to your function call. These operators include some Airflow objects like context, etc.Paleography
Ah, I was thinking it went in my dag's PythonOperator, but it goes in the callable. In this case, my_sleeping_function(threshold, **kwargs).Astral
This is correct. I did not see you were asking about putting the kwargs in the PythonOperator constructor.Paleography
L
4

This is how you can pass arguments for a Python operator in Airflow.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

from time import sleep
from datetime import datetime

def my_func(*op_args):
        print(op_args)
        return op_args[0]

with DAG('python_dag', description='Python DAG', schedule_interval='*/5 * * * *', start_date=datetime(2018, 11, 1), catchup=False) as dag:
        dummy_task      = DummyOperator(task_id='dummy_task', retries=3)
        python_task     = PythonOperator(task_id='python_task', python_callable=my_func, op_args=['one', 'two', 'three'])

        dummy_task >> python_task
Laconia answered 16/7, 2019 at 11:49 Comment(0)
M
2

This is an old question now, with a few solutions. None of them say why you're getting the error though.

It's because of provide_context=True. It passes an addition set of keyword arguments to the function. A list of what it passes can be found here: https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#templates-variables

As you can see, dag_run is there. You can either set provide_context=False or add **kwargs to your function to handle them. Hopefully this helps anyone else who stumbles on this question.

Mccluskey answered 2/12, 2023 at 22:53 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.