TemplateNotFound when using Airflow's PostgresOperator with Jinja templating and SQL
Asked Answered
B

2

17

When trying to use Airflow's templating capabilities (via Jinja2) with the PostgresOperator, I've been unable to get things to render. It's quite possible I'm doing something wrong, but I'm pretty lost as to what the issue might be. Here's an example to reproduce the TemplateNotFound error I've been getting:

airflow.cfg

airflow_home = /home/gregreda/airflow
dags_folder = /home/gregreda/airflow/dags

relevant DAG and variables

default_args = {
    'owner': 'gregreda',
    'start_date': datetime(2016, 6, 1),
    'schedule_interval': None,
    'depends_on_past': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

this_dag_path = '/home/gregreda/airflow/dags/example_csv_to_redshift'
dag = DAG(
    dag_id='example_csv_to_redshift',
    schedule_interval=None,
    default_args=default_args
)

/example_csv_to_redshift/csv_to_redshift.py

copy_s3_to_redshift = PostgresOperator(
    task_id='load_table',
    sql=this_dag_path + '/copy_to_redshift.sql',
    params=dict(
        AWS_ACCESS_KEY_ID=Variable.get('AWS_ACCESS_KEY_ID'),
        AWS_SECRET_ACCESS_KEY=Variable.get('AWS_SECRET_ACCESS_KEY')
    ),
    postgres_conn_id='postgres_redshift',
    autocommit=False,
    dag=dag
)

/example_csv_to_redshift/copy_to_redshift.sql

COPY public.table_foobar FROM 's3://mybucket/test-data/import/foobar.csv'
CREDENTIALS 'aws_access_key_id={{ AWS_ACCESS_KEY_ID }};aws_secret_access_key={{ AWS_SECRET_ACCESS_KEY }}'
CSV
NULL as 'null'
IGNOREHEADER as 1;

Calling airflow render example_csv_to_redshift load_table 2016-06-14 throws the exception below. Note I'm running into this issue for another DAG as well, which is why you see the path with example_redshift_query_to_csv mentioned.

[2016-06-14 21:24:57,484] {__init__.py:36} INFO - Using executor SequentialExecutor
[2016-06-14 21:24:57,565] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2016-06-14 21:24:57,596] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2016-06-14 21:24:57,763] {models.py:154} INFO - Filling up the DagBag from /home/gregreda/airflow/dags
[2016-06-14 21:24:57,828] {models.py:2040} ERROR - /home/gregreda/airflow/dags/example_redshift_query_to_csv/export_query_to_s3.sql
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2038, in resolve_template_files
    setattr(self, attr, env.loader.get_source(env, content)[0])
  File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source
    raise TemplateNotFound(template)
TemplateNotFound: /home/gregreda/airflow/dags/example_redshift_query_to_csv/export_query_to_s3.sql
[2016-06-14 21:24:57,834] {models.py:2040} ERROR - /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2038, in resolve_template_files
    setattr(self, attr, env.loader.get_source(env, content)[0])
  File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source
    raise TemplateNotFound(template)
TemplateNotFound: /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 15, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 359, in render
    ti.render_templates()
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1409, in render_templates
    rendered_content = rt(attr, content, jinja_context)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2017, in render_template
    return jinja_env.get_template(content).render(**context)
  File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 812, in get_template
    return self._load_template(name, self.make_globals(globals))
  File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 774, in _load_template
    cache_key = self.loader.get_source(self, name)[1]
  File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source
    raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql

Any ideas towards a fix are much appreciated.

Brasserie answered 14/6, 2016 at 21:24 Comment(0)
B
8

Standard PEBCAK error.

There was an issue specifying the path to the SQL template within the given Airflow task, which needed to be relative.

copy_s3_to_redshift = PostgresOperator(
    task_id='load_table',
    sql='/copy_to_redshift.sql',
    params=dict(
        AWS_ACCESS_KEY_ID=Variable.get('AWS_ACCESS_KEY_ID'),
        AWS_SECRET_ACCESS_KEY=Variable.get('AWS_SECRET_ACCESS_KEY')
    ),
    postgres_conn_id='postgres_redshift',
    autocommit=False,
    dag=dag
)

Additionally, the SQL template needed to be changed slightly (note the params. ... this time):

COPY public.pitches FROM 's3://mybucket/test-data/import/heyward.csv'
CREDENTIALS 'aws_access_key_id={{ params.AWS_ACCESS_KEY_ID }};aws_secret_access_key={{ params.AWS_SECRET_ACCESS_KEY }}'
CSV
NULL as 'null'
IGNOREHEADER as 1;
Brasserie answered 14/6, 2016 at 22:17 Comment(0)
B
18

For a bit more control, instantiate your DAG with the template_searchpath param, then just use the filename in the operator.

:param template_searchpath: This list of folders (non relative)
    defines where jinja will look for your templates. Order matters.
    Note that jinja/airflow includes the path of your DAG file by
    default
:type template_searchpath: string or list of stings

As @yannicksse suggested, applying this practice to your original dag would look like this:

dag = DAG(
    dag_id='example_csv_to_redshift',
    schedule_interval=None,
    template_searchpath=[this_dag_path]  # here
    default_args=default_args
)

copy_s3_to_redshift = PostgresOperator(
    task_id='load_table',
    sql='copy_to_redshift.sql',  # and here
    params=dict(
        AWS_ACCESS_KEY_ID=Variable.get('AWS_ACCESS_KEY_ID'),
        AWS_SECRET_ACCESS_KEY=Variable.get('AWS_SECRET_ACCESS_KEY')
    ),
    postgres_conn_id='postgres_redshift',
    autocommit=False,
    dag=dag
)

although, personally, I'd put all the templates in a subfolder

Benzofuran answered 11/7, 2017 at 12:47 Comment(0)
B
8

Standard PEBCAK error.

There was an issue specifying the path to the SQL template within the given Airflow task, which needed to be relative.

copy_s3_to_redshift = PostgresOperator(
    task_id='load_table',
    sql='/copy_to_redshift.sql',
    params=dict(
        AWS_ACCESS_KEY_ID=Variable.get('AWS_ACCESS_KEY_ID'),
        AWS_SECRET_ACCESS_KEY=Variable.get('AWS_SECRET_ACCESS_KEY')
    ),
    postgres_conn_id='postgres_redshift',
    autocommit=False,
    dag=dag
)

Additionally, the SQL template needed to be changed slightly (note the params. ... this time):

COPY public.pitches FROM 's3://mybucket/test-data/import/heyward.csv'
CREDENTIALS 'aws_access_key_id={{ params.AWS_ACCESS_KEY_ID }};aws_secret_access_key={{ params.AWS_SECRET_ACCESS_KEY }}'
CSV
NULL as 'null'
IGNOREHEADER as 1;
Brasserie answered 14/6, 2016 at 22:17 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.