External files in Airflow DAG
Asked Answered
C

4

28

I'm trying to access external files in a Airflow Task to read some sql, and I'm getting "file not found". Has anyone come across this?

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

dag = DAG(
    'my_dat',
    start_date=datetime(2017, 1, 1),
    catchup=False,
    schedule_interval=timedelta(days=1)
)

def run_query():
    # read the query
    query = open('sql/queryfile.sql')
    # run the query
    execute(query)

tas = PythonOperator(
    task_id='run_query', dag=dag, python_callable=run_query)

The log state the following:

IOError: [Errno 2] No such file or directory: 'sql/queryfile.sql'

I understand that I could simply copy and paste the query inside the same file, it's really not at neat solution. There are multiple queries and the text is really big, embed it with the Python code would compromise readability.

Casanova answered 23/3, 2017 at 17:27 Comment(0)
C
24

Here is an example use Variable to make it easy.

  • First add Variable in Airflow UI -> Admin -> Variable, eg. {key: 'sql_path', values: 'your_sql_script_folder'}

  • Then add following code in your DAG, to use Variable from Airflow you just add.

DAG code:

import airflow
from airflow.models import Variable

tmpl_search_path = Variable.get("sql_path")

dag = airflow.DAG(
   'tutorial',
    schedule_interval="@daily",
    template_searchpath=tmpl_search_path,  # this
    default_args=default_args
)
  • Now you can use sql script name or path under folder Variable

  • You can learn more in this

Chryso answered 7/9, 2017 at 8:49 Comment(6)
Please, could you provide a full example. Defining template_searchpath, what does that change the overall script behaviour, can I reference the file by its name now? For example would this complete your example: ``` with open(query_file_name, 'r') as file: query_content = file.read() ``` ?Brokaw
I don't think this would work with the example DAG the OP uses with PythonOperator and Python's native open(). The PythonOperator runs in a pod which doesn't have access to the same set of locations as the process which parses the DAGs.Drat
@RicardoMS Hi, when you want to define your own airflow.models.Variable, the easiest way is by Airflow UI, homepage -> Admin -> Variables to create new variable, eg: {'Key': 'RicardoMS_variable', 'Val': '/opt/specific/path'}. After you done, you could use the example code to load you variable by tmpl_search_path = Variable.get("RicardoMS_variable") instead of direct use '/opt/specific/path'Chryso
@Drat I'm having the issue that you're pointing out. The $AIRFLOW_HOME env var is set to /opt/*** and even if I use its value /opt/airflow directly in a file path it is changed to /opt/*** automagically and I get file not found errors.Mastermind
@Mastermind accessing files on disk during Task execution is tricky because the actual Python code you're trying to execute doesn't run on the same machine as your Airflow installation: it runs in its own infrastructure. So if you need your tasks to have access to files on disk when they run, you'll have to plan for that explicitly. Maybe load the file during the parsing of the DAG Definition file then pass the result to the task as a string?Drat
@lonndonrob it turned out that there was a typo and the code was good, but I had mounted the folder from the container on my local machine. Is it coincidental that it worked, perhaps because it was a local installation on a single machine, or is that expected behaviour?Mastermind
T
9

Assuming that the sql directory is relative to the current Python file, you can figure out the absolute path to the sql file like this:

import os

CUR_DIR = os.path.abspath(os.path.dirname(__file__))

def run_query():
    # read the query
    query = open(f"{CUR_DIR}/sql/queryfile.sql")
    # run the query
    execute(query)
Tajo answered 6/5, 2021 at 0:1 Comment(0)
E
8

All relative paths are taken in reference to the AIRFLOW_HOME environment variable. Try:

  • Giving absolute path
  • place the file relative to AIRFLOW_HOME
  • try logging the PWD in the python callable and then decide what path to give (Best option)
Effloresce answered 23/3, 2017 at 18:43 Comment(1)
Good comment, but unfortunately AIRFLOW_HOME is an optional environment variable - Airflow works just fine without it - and you can't guarantee that it will be set.Maladjusted
H
0

you can get DAG directory like below.

conf.get('core', 'DAGS_FOLDER')

# open file
open(os.path.join(conf.get('core', 'DAGS_FOLDER'), 'something.json'), 'r')

ref: https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dags-folder

Horrible answered 17/1, 2023 at 3:12 Comment(1)
conf.get('core', 'DAGS_FOLDER') NameError: name 'conf' is not definedForeshank

© 2022 - 2024 — McMap. All rights reserved.