I've got a confusing issue on Airflow which I don't understand.
I have a SQL scripts folder at DML/analytics/my_script.sql. The MySQL operator works perfectly in normal circumstances, but does not when I try to call it from a Python operator as follows. This is necessitated by needing to pass in XCOM values from another task:
def insert_func(**kwargs):
run_update = MySqlOperator(
sql='DML/analytics/my_script.sql',
task_id='insert_func',
mysql_conn_id="bi_mysql",
params={
"table_name": table_name,
'ts': kwargs['task_instance'].xcom_pull(key='return_value',task_ids='get_existing_data')
},
)
run_update.execute(context=kwargs['task_instance'])
with DAG("my_dag", **dag_params) as dag:
with TaskGroup(group_id='insert') as insert:
get_existing_data = PythonOperator(
task_id='get_existing_data',
python_callable=MySQLGetRecord,
op_kwargs={
'target_db_conn_id':'bi_mysql',
'target_db':'analytics',
'sql': f'SELECT invoice_date FROM analytics.{table_name} ORDER BY 1 DESC'
}
),
insert = PythonOperator(
task_id='insert',
python_callable=insert_func
)
get_existing_data >> insert_func
The error I get is: MySQLdb._exceptions.ProgrammingError: (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'DML/analytics/my_script.sql' at line 1")
Clearly it is trying to run the literal string passed in the sql
parameter rather than using it as a file location. Why is this happening? Again, this works if I move the run_update
task to the my_dag with
clause, but I need to do it this way to get the XCOM value from get_existing_data
, correct...?