How to install dependency modules for airflow DAG task(or python code)? , "Failed to import module" in airflow DAG when using kuberentesExecutor
Asked Answered
R

5

8

I have a airflow DAG "example_ml.py" which has a task "train_ml_model" and this task is calling/running a python script "training.py".

-Dags/example_ml.py -Dags/training.py

When I run Dag, it is failing to import modules required for training script to execute. Error in import sklearn module

Code snippet for DAG task:

   train_model = PythonOperator(
        task_id='train_model',
        python_callable=training,
        dag = dag
    )

PS: I'm using k8s cluster. Airflow is running in k8s cluster, and executor is set to kubernetesExecutor. So when each DAG is triggered a new pod gets assigned to complete the task.

Rotunda answered 31/3, 2021 at 16:18 Comment(0)
S
1

I have encountered the same issue with two dependencies (praw and supabase).

Here's my solution:

  1. Add dependencies to a requirements.txt file:

    To do this, list your dependencies using pip freeze. Then, select the target dependencies you want to install in Airflow. Finally, copy the dependency and its version. For example: supabase==2.3.3

  2. Modify your Dockerfile by adding the following to copy the requirements and install them:

    COPY requirements.txt /requirements.txt
    RUN pip install --user --upgrade pip
    RUN pip install --no-cache-dir --user -r /requirements.txt
    
    
  3. Build a new Docker image:

    docker build . --tag extending_airflow:latest

  4. Finally, bring up your environment:

    docker-compose up -d --no-deps --build airflow-webserver airflow-scheduler

I hope this helps you!

Sharell answered 15/1 at 0:12 Comment(1)
The answer includes running a solution in docker-compose, but the question specifically states kubernetes is the airflow environment. A better answer would be suggesting that they try the PythonVirtualenvOperator if they are already using the PythonOperator. Or if they are using the KubernetesPodOperator (unclear from the question) to instead suggest they deploy the built container image to a container registry and then refer to that built image in the KubernetesPodOperator.Documentary
B
2

Best practices (to my little knowledge) is to build and provide your own custom docker image that has all the required dependencies. Once you got that you push it to a repository of your choice and then there is a specific set of option you can use in your DAG file to declare what docker image to use for each task in your DAG, as in :

def use_airflow_binary():
rc = os.system("airflow -h")
assert rc == 0
# You don't have to use any special KubernetesExecutor configuration if you don't want to
start_task = PythonOperator(
task_id="start_task", python_callable=print_stuff, dag=dag
)
# But you can if you want to
one_task = PythonOperator(
task_id="one_task", python_callable=print_stuff, dag=dag,
executor_config={"KubernetesExecutor": {"image": "airflow:latest"}}
)
# Use the airflow -h binary
two_task = PythonOperator(
task_id="two_task", python_callable=use_airflow_binary, dag=dag,
executor_config={"KubernetesExecutor": {"image": "airflow:latest"}}
)

The interesting bit here is :

executor_config={"KubernetesExecutor": {"image": "airflow:latest"}}

This is where you can use your custom built docker image.

There is even more option available (resources allocation, affinity etc ...)

All credits goes to Marc Lamberti

Blockbuster answered 11/7, 2022 at 20:50 Comment(1)
I think this option works only if we have executor_config param or operators that support DAG decorator. for example this won't work for GreatExpectationsOperatorAspidistra
L
2

I found an easy (and ugly) workaround for this problem in my use-case.

The problem I am solving is simply installing a package in an environment that is hard to find and manage because it is located deep within containers and paths.

My reasoning to find this solution is that we are able to run python code inside this environment that is otherwise unreachable with pip-commands.

What I have done is install the package directly from the DAG itself, this is what my DAG function looks like:

def dag_function(**kwargs):

    import sys
    import subprocess

    subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'packagename'])

    import packagename

    # ... Use package

Source for installing packages as from python script: https://www.activestate.com/resources/quick-reads/how-to-install-python-packages-using-a-script/

This violates various principles, one of which is keeping all of your requirements separate from the code, but it works without having to recompile entire images, so that's nice.

Lorinalorinda answered 9/2, 2023 at 19:9 Comment(0)
S
1

I have encountered the same issue with two dependencies (praw and supabase).

Here's my solution:

  1. Add dependencies to a requirements.txt file:

    To do this, list your dependencies using pip freeze. Then, select the target dependencies you want to install in Airflow. Finally, copy the dependency and its version. For example: supabase==2.3.3

  2. Modify your Dockerfile by adding the following to copy the requirements and install them:

    COPY requirements.txt /requirements.txt
    RUN pip install --user --upgrade pip
    RUN pip install --no-cache-dir --user -r /requirements.txt
    
    
  3. Build a new Docker image:

    docker build . --tag extending_airflow:latest

  4. Finally, bring up your environment:

    docker-compose up -d --no-deps --build airflow-webserver airflow-scheduler

I hope this helps you!

Sharell answered 15/1 at 0:12 Comment(1)
The answer includes running a solution in docker-compose, but the question specifically states kubernetes is the airflow environment. A better answer would be suggesting that they try the PythonVirtualenvOperator if they are already using the PythonOperator. Or if they are using the KubernetesPodOperator (unclear from the question) to instead suggest they deploy the built container image to a container registry and then refer to that built image in the KubernetesPodOperator.Documentary
T
0

Could you give more details? Are you running this on your local computer? A container? Are you sure the package is installed? As you commented out, the error seems to be related to missing package. Creating a task to install may not solve the issue. The ideal is just install requirements on whatever you are running airflow

Toque answered 31/3, 2021 at 16:47 Comment(2)
I'm running airflow on Kubernetes cluster and using Kubernetes Executor for airflow. So one DAG is run , it creates a pod and executes the tasks inside pod. I'm using PythonOperator for running the training task.Rotunda
ok, when you create a Pod you need to specify the image it will run. You'll need to install the package on that image. You must have defined the airflow image to run. If it's a custom image, just install on that imgeToque
B
0

I had the same issue and this is how i solved it:

running following python code

>>> import sys
>>> from pprint import pprint
>>> pprint(sys.path)

I get these paths

 '/home/.local/lib/python3.6/site-packages',
 '/usr/local/lib/python3.6/dist-packages',
 '/usr/lib/python3/dist-packages'

for me airflow is listed under

'/usr/local/lib/python3.6/dist-packages'

therefore for the package to be found, it must be installed right here. I used this command to install my package:

sudo python3 -m pip install -system [package-name] -t $(pwd)
Boysenberry answered 1/7, 2021 at 6:32 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.