How to use DBT with AWS Managed Airflow?
Asked Answered
T

4

8

hope you are doing well. I wanted to check if anyone has get up and running with dbt in aws mwaa airflow.

I have tried without success this one and this python packages but fails for some reason or another (can't find the dbt path, etc).

Did anyone has managed to use MWAA (Airflow 2) and DBT without having to build a docker image and placing it somewhere?

Thank you!

Townsley answered 4/6, 2021 at 13:56 Comment(2)
I've heard that dbt may sometime have dependencies conflicting with Airflow. Could you please add some errors that you get?Hydnocarpate
I would look after it but basically I don't have access to $PATH because the user that AWS expose to me and I also can make a call to the main() object from DBT.Townsley
S
7

I've managed to solve this by doing the following steps:

  1. Add dbt-core==0.19.1 to your requirements.txt
  2. Add DBT cli executable into plugins.zip
#!/usr/bin/env python3
# EASY-INSTALL-ENTRY-SCRIPT: 'dbt-core==0.19.1','console_scripts','dbt'
__requires__ = 'dbt-core==0.19.1'
import re
import sys
from pkg_resources import load_entry_point

if __name__ == '__main__':
    sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0])
    sys.exit(
        load_entry_point('dbt-core==0.19.1', 'console_scripts', 'dbt')()
    )

And from here you have two options:

  1. Setting dbt_bin operator argument to /usr/local/airflow/plugins/dbt
  2. Add /usr/local/airflow/plugins/ to the $PATH by following the docs

Environment variable setter example:

from airflow.plugins_manager import AirflowPlugin
import os

os.environ["PATH"] = os.getenv(
    "PATH") + ":/usr/local/airflow/.local/lib/python3.7/site-packages:/usr/local/airflow/plugins/"


class EnvVarPlugin(AirflowPlugin):
    name = 'env_var_plugin'

The plugins zip content:

plugins.zip
├── dbt (DBT cli executable)
└── env_var_plugin.py (environment variable setter)
Solution answered 16/6, 2021 at 6:14 Comment(13)
Thanks for the answer Yonatan. Can you explain which plugins.zip are you talking about in your answer above ?Kosse
@Smit sure, have a look at MWAA docs docs.aws.amazon.com/mwaa/latest/userguide/…Solution
do you have any examples of the plugins folder?Punic
@AmitBaranes Edited the answer with the additional plugin file (the environment setter)Solution
I've done this, but when I create a simple bash operator dag with the following bash_task = BashOperator(task_id='bash_task_1', bash_command="dbt", dag=dag) It fails with error "command not found." When I replicate this on my local dockerized version of airflow, it works.Regelate
Make sure you have setup the plugins path and the content of plugins.zip correctlySolution
I made sure everything was wired up properly. Forgot some permissions locally and chmodding it worked, but it's still not recognizing it in MWAA. Just to verify, you're using dbt 0.19.1? Do you know if it works with 0.20.0? I'm currently running Airflow 2.0.2, attempted dbt 0.20.0 and 0.19.1, neither of which worked.Regelate
I'm sure it work on 0.19.1, airflow 2.0.2. How did you call the dbt executable? dbt right? Not dbt.pySolution
You mention that it works with airflow 2.0.2, but I can't use the provided dbt, because the airflow 2.0.2 constraints have a conflict with the dbt. There is that "hologram 0.0.14 has requirement jsonschema<3.2,>=3.0, but you'll have jsonschema 3.2.0 which is incompatible." and the plugins/dbt can't continue. Can you help?Loux
It's in the context of mwaa, which uses older pip version that does not break over constraint conflictsSolution
In my case it started working after adding ":/usr/local/airflow/.local/bin" to PATH. I followed the env_var_plugin.py mentioned above and no workaround for dbt cli was requiredNessa
In my situation, I included env_var_plugin.py under plugins.zip and then set up the dbt-pproject.yml with paths. Using bashOperator, I exported snowflake aws secrets variables, which were then provided to profile.yml. However, my dag failed with a key error while reading secrets. I have not configured the dbt (DBT cli program). Is this necessary? If so, what is the right file name and location within plugins.zip? Configured the requirements.txt file with the latest recent versions. jsonschema = 4.21.1, dbt-core = 1.7.6, dbt-snowflake = 1.7.6Hypogeum
I'm running my dbt model from airflow-mwaa environment and it failed with the Database Error , 001003 (42000): SQL compilation error: , syntax error line 1 at position 48 unexpected '<EOF>'. , syntax error line 1 at position 47 unexpected '.'. in airflow dag logs, and surprisingly the same dbt project ran fine without any issues in my local dbt-core environment. Did any boday faced the same with dbt-core. airflow-mwaa environment.Hypogeum
P
0

Using the pypi airflow-dbt-python package has simplified the setup of dbt_ to MWAA for us, as it avoids needing to amend PATH environment variables in the plugins file. However, I've yet to have a successful dbt_ run via either airflow-dbt or airflow-dbt-python packages, as MWAA worker seems to be a read only filesystem, so as soon as dbt_ tries to compile to the target directory, the following error occurs:

File "/usr/lib64/python3.7/os.py", line 223, in makedirs
    mkdir(name, mode)
OSError: [Errno 30] Read-only file system: '/usr/local/airflow/dags/dbt/target'
Piliferous answered 16/8, 2021 at 11:3 Comment(2)
I am also at the same situation as yours and expecting to find an answer.Crosspiece
The MWAA read-only filesystem problem can be overcome by setting the target-path in the dbt_profile.yml file to /tmp (the only writeable area on the MWAA workers) i.e target-path: "/tmp/dbt/target". However, we needed to move the dbt deps process to our CI/CD pipeline build so that the contents of the dbt_modules are copied to the the MWAA S3 bucket as part of the project and then propagated across all workers.Piliferous
W
0

This is how I managed to do it:

    @dag(**default_args)
    def dbt_dag():
        @task()
        def run_dbt():
            from dbt.main import handle_and_check
            
            os.environ["DBT_TARGET_DIR"] = "/usr/local/airflow/tmp/target"
            os.environ["DBT_LOG_DIR"] = "/usr/local/airflow/tmp/logs"
            os.environ["DBT_PACKAGE_DIR"] = "/usr/local/airflow/tmp/packages"
            succeeded = True
            try:
                args = ['run', '--whatever', 'bla']
                results, succeeded = handle_and_check(args)
                print(results, succeeded)
            except SystemExit as e:
                if e.code != 0:
                raise e   
            if not succeeded:
                raise Exception("DBT failed")         

note that my dbt_project.yml has the following paths, this is to avoid os exception when trying to write to read only paths:

    target-path: "{{ env_var('DBT_TARGET_DIR', 'target') }}"  # directory which will store compiled SQL files
    log-path: "{{ env_var('DBT_LOG_DIR', 'logs') }}"  # directory which will store dbt logs
    packages-install-path: "{{ env_var('DBT_PACKAGE_DIR', 'packages') }}"  # directory which will store dbt packages

Willett answered 15/3, 2022 at 10:6 Comment(0)
S
0

Combining the answer from @Yonatan Kiron & @Ofer Helman works for me. I just need to fix these 3 files:

  • requiremnt.txt
  • plugins.zip
  • dbt_project.yml

My requirements.txt I specify the version I want, and looks like this:

airflow-dbt==0.4.0
dbt-core==1.0.1
dbt-redshift==1.0.0

Note that, as of v1.0.0, pip install dbt is no longer supported and will raise an explicit error. Since v0.13, the PyPi package named dbt was a simple "pass-through" of dbt-core. (refer https://docs.getdbt.com/dbt-cli/install/pip#install-dbt-core-only)

For my plugins.zip I add a file env_var_plugin.py that looks like this

from airflow.plugins_manager import AirflowPlugin
import os

os.environ["DBT_LOG_DIR"] = "/usr/local/airflow/tmp/logs"
os.environ["DBT_PACKAGE_DIR"] = "/usr/local/airflow/tmp/dbt_packages"
os.environ["DBT_TARGET_DIR"] = "/usr/local/airflow/tmp/target"

class EnvVarPlugin(AirflowPlugin):                
     name = 'env_var_plugin'

And finally I add this in my dbt_project.yml

log-path: "{{ env_var('DBT_LOG_DIR', 'logs') }}"  # directory which will store dbt logs
packages-install-path: "{{ env_var('DBT_PACKAGE_DIR', 'dbt_packages') }}"  # directory which will store dbt packages
target-path: "{{ env_var('DBT_TARGET_DIR', 'target') }}"  # directory which will store compiled SQL files

And as stated in the airflow-dbt github, (https://github.com/gocardless/airflow-dbt#amazon-managed-workflows-for-apache-airflow-mwaa) configure the dbt task like below:

  dbt_bin='/usr/local/airflow/.local/bin/dbt',
  profiles_dir='/usr/local/airflow/dags/{DBT_FOLDER}/',
  dir='/usr/local/airflow/dags/{DBT_FOLDER}/'
Sansbury answered 13/4, 2022 at 3:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.