Airflow MySQL operator trying to execute script path string as SQL, rather than using template
Asked Answered
F

1

1

I've got a confusing issue on Airflow which I don't understand.

I have a SQL scripts folder at DML/analytics/my_script.sql. The MySQL operator works perfectly in normal circumstances, but does not when I try to call it from a Python operator as follows. This is necessitated by needing to pass in XCOM values from another task:

def insert_func(**kwargs):
    run_update = MySqlOperator(
        sql='DML/analytics/my_script.sql',
        task_id='insert_func',
        mysql_conn_id="bi_mysql",
        params={
            "table_name": table_name, 
            'ts': kwargs['task_instance'].xcom_pull(key='return_value',task_ids='get_existing_data')
        },
    )
    run_update.execute(context=kwargs['task_instance'])

with DAG("my_dag", **dag_params) as dag:
    with TaskGroup(group_id='insert') as insert:
        get_existing_data = PythonOperator(
            task_id='get_existing_data',
            python_callable=MySQLGetRecord,
            op_kwargs={
                'target_db_conn_id':'bi_mysql', 
                'target_db':'analytics', 
                'sql': f'SELECT invoice_date FROM analytics.{table_name} ORDER BY 1 DESC'
            }
        ),
        insert = PythonOperator(
            task_id='insert',
            python_callable=insert_func
        )
        get_existing_data >> insert_func

The error I get is: MySQLdb._exceptions.ProgrammingError: (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'DML/analytics/my_script.sql' at line 1")

Clearly it is trying to run the literal string passed in the sql parameter rather than using it as a file location. Why is this happening? Again, this works if I move the run_update task to the my_dag with clause, but I need to do it this way to get the XCOM value from get_existing_data, correct...?

Faucal answered 25/4, 2022 at 10:43 Comment(0)
I
4

When you are using operator as normal (e.g to be used by Airflow) then Airflow is responsible for the whole task lifecycle. This means Airflow handles the templating, executing pre_execute(), executing execute(), executing on_faulure/retries etc...

What you did is using operator inside operator -> PythonOperator that contains MySqlOperator. In this case the inner operator (MySqlOperator) is just a regular Python class. While it's called Operator - it's is not a "real" Operator. You are not enjoying any of the lifecycle steps as you might expect.

You might have already realised it as by your own example you specifically triggered the execute():

run_update.execute(context=kwargs['task_instance'])

Notice you didn't need to do this for the PythonOperaor.

You can see in the code base that Airflow invokes render_templates before it invokes pre_execute() and before it invokes execute().

This means that if you want the MySqlOperator to be templated you need to call the function that does the templating before you invoke the execute()

That said - I strongly encourage you - Do not use operator inside operator.

From your code I don't see reason why you can't just use MySqlOperator directly without the PythonOperaor but should there be a reason the proper way to handle it is to create a CustomMySqlOperator that handles the logic you seek. By doing so you will not have problems with using .sql files.

Indreetloire answered 25/4, 2022 at 11:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.