Using Airflow template files and template_searchpath in Google Cloud Composer
Asked Answered
H

3

9

I'm using the BigQueryOperator extenstively in my Airflow DAGs on Google Cloud Composer.

For longer queries, it's better to put each query in its own .sql file rather than cluttering up the DAG with it. Airflow seems to support this for all SQL Query operators, including the BigQueryOperator, as you can see in the documentation.

My question: after I've written a my sql statement in a .sql template file, how do I add it to Google Cloud Composer and reference it in a DAG?

Honorarium answered 8/6, 2018 at 12:51 Comment(0)
Q
12

I found an ideal fix for this question. In your dag declaration you can set template_searchpath which is the default path where Airflow will lookup jinja templated files.

In order to make this work in your Cloud Composer instance, you must set it at follows

dag = DAG(
    ...
    template_searchpath=["/home/airflow/gcs/plugins"],
)

Note that I used the plugins folder for this example. You can use your data folder instead or any folder you want to have inside your bucket.

Quita answered 29/6, 2021 at 14:25 Comment(1)
If the path I entered does not work for you, you will need to find the path for your Cloud Composer instance. This is not hard to find. In any DAG you could simply log the sys.path variable and see the path printed.Quita
H
5

After googling around and finding this related question. I've found a way to make this work (although it's not the ideal solution, as we'll see). Here is a working example with three pieces:

  1. the sql template file with a bit of jinja templating,
  2. the DAG, and
  3. the gcloud command needed to upload the template to the right place.

(1) The sql template file This is just a text file whose filename ends with the .sql extension. Let's say this file is called my-templated-query.sql and contains:

SELECT COUNT(1)
FROM mytable
WHERE _PARTITIONTIME = TIMESTAMP('{{ ds }}')

(2) Referencing the template in the DAG file To reference this template, create an operator like the following:

count_task = BigQueryOperator(
  task_id='count_rows',
  sql='/my-templated-query.sql')

(3) Adding the template file to Google Cloud Composer It turns out that by default, airflow looks for template files in the dags folder. To upload our templated file to the dags folder, we run

gcloud beta composer environments storage dags import --environment my-env-name --location us-central1 --source path/to/my-templated-query.sql

You'll have to replace the env name, location, and source path accordingly.

It doesn't really seem right to upload all these templates to the dag folder. A better Airflow practice is to put your templates in their own folder, and specify the template_searchpath parameter to point to it when you create your DAG. However, I'm not sure how to do this with Google Cloud Composer.

Update: I've realized it's possible to put subfolders in the DAG folder, which is useful for organizing large numbers of SQL templates. Let's say I put a SQL template file in DAG_FOLDER/dataset1/table1.sql In the BigQueryOperator, Ithen can then refer to this using sql=/dataset1/table1.sql. If you've a subfolder with lots of files in it and lots of other subfolders in it, you can also use the dag import I show above to upload the entire sub folder recursively--just point it to subfolder.

Honorarium answered 8/6, 2018 at 13:4 Comment(0)
D
0

We recently solved this using a similar strategy. The steps are:

  1. Put all of your SQL files into a Google Cloud Source Repository
  2. At the beginning of each DAG run, clone the files into the "data" directory in the Cloud Storage Bucket that is automatically shared with your Airflow environment.
  3. Read the queries in at execution using templates within the BigQueryOperator.

Here's a minimal solution:

from airflow.operators import bash_operator
from airflow.contrib.operators import bigquery_operator

with models.DAG(
        'bigquery_dag',
        schedule_interval = None ,
        template_searchpath = ['/home/airflow/gcs/data/repo/queries/'],
        default_args = default_dag_args
        ) as dag:

    t1_clean_repo = bash_operator.BashOperator(
        task_id = 'clean_repo',
        bash_command = 'rm -rf /home/airflow/gcs/data/repo'
    )

    clone_command = """
        gcloud source repos clone repo --project=project_id
        cp -R repo /home/airflow/gcs/data
    """

    t2_clone_repo = bash_operator.BashOperator(
        task_id='clone_repo',
        bash_command=clone_command
        )

    t3_query = bigquery_operator.BigQueryOperator(
        task_id='query',
        sql= 'query.sql',
        use_legacy_sql = False,
        bigquery_conn_id='conn_id'
    )

We're taking advantage of a few important concepts here:

  1. The data directory in the Cloud Storage Bucket is automatically shared with your Airflow instance via Fuse. Anything put in here is accessible by most operators.
  2. As long as your Google Cloud Source repository is in the same project as Cloud Composer, your Airflow instance doesn't need additional permissions to git clone the files.
  3. We're setting the template_searchpath in the DAG arguments, expanding the search scope to include the data directory in the Cloud Storage Bucket.
Dignadignified answered 7/7, 2020 at 23:48 Comment(2)
You wrote about Google Cloud Storage, but where is that service in your solution (DAG). You store your SQL queries in the Google Repo service, clone them to VM in Composer cluster and use the path to them in DAG. Where is here GCS?Virescence
@Virescence everything inside of /home/airflow/gcs actually lives inside Cloud Storage and is effectively "mirrored" into your Airflow env.Dignadignified

© 2022 - 2024 — McMap. All rights reserved.