How do you access Airflow variables with task decorators using jinja templating?
Asked Answered
E

2

6

I'm currently accessing an Airflow variable as follows:

from airflow.models import Variable

s3_bucket = Variable.get('bucket_name')

It works but I'm being asked to not use the Variable module and use jinja templating instead (i.e.):

s3_bucket = '{{ var.value.bucket_name }}'

The problem is jinja works when I'm using it in an airflow template (e.g., PythonOperator/BashOperator) but I'm having trouble getting it to work in taskflow API form. The variable is read as string literal. Example:

# Pretend DAG defined here

@task
def example_task():
    s3_bucket = '{{ var.value.bucket_name }}'
    print(s3_bucket)

example_task()

The above would print "{{ var.value.bucket_name }}" instead of the bucket_name value.

Erhart answered 5/4, 2022 at 15:19 Comment(0)
D
8

It works but I'm being asked to not use the Variable module and use jinja templating instead

This is not accurate recommendation and I'll explain why.

There is absolutely no problem with doing:

@task
def example_task():
    s3_bucket = Variable.get('bucket_name')
    print(s3_bucket)

example_task()

You should avoid using Variable.get() in top level code. Using it in a python callable invoked from PythonOperator is perfectly safe.

Airflow constantly parse your .py files in search for changes in DAGs. This also means that any code you write as top level is being executed when parsing process runs. Since parsing is executed every 30 seconds (default of min_file_process_interval) it will cause stress on your backend metastore. Now consider that your instance is growing with more and more DAGs using the same approach - you might end up with not being able to reach the database due to the heavy volume. You are practically "attacking" your own database. This lead to the recommendation to use macros as with macros you can NEVER be at risk of causing stress on the database since macros are evaluated only in run-time. However this doesn't mean that you should avoid using Variable.get() when it's useful. In cases where you are not using the macro right you will get syntax error (Like you experienced).

To clarify - It's OK to use Variable.get() in any code part which is not top level code.

Doth answered 5/4, 2022 at 21:29 Comment(2)
This is incredibly useful information. Thanks for the information. I'm surprised at how much top level vars are defined in our DAGs given what you said.Erhart
Thanks for this answer; very helpful. To be clear, when you say "top level code", this includes code within the context manager of your DAG definition that is outside of a task, correct? So really, "any code that is outside of a task" might be another way to phrase that?Apsis
E
3

I found out you can do this:

@task
def example_task(s3_bucket):
    print(s3_bucket)
example_task('{{ var.value.bucket_name }}')
Erhart answered 7/4, 2022 at 3:28 Comment(2)
Do we know if variable will be fetched during the DAG parsing in this case?Paryavi
@Paryavi did you find an answer to this? My understanding is the variable is fetched during runtime of that jobConsubstantial

© 2022 - 2024 — McMap. All rights reserved.