Cannot install additional requirements to apache airflow
Asked Answered
I

3

19

I am using the following docker-compose image, I got this image from: https://github.com/apache/airflow/blob/main/docs/apache-airflow/start/docker-compose.yaml

version: "3"
x-airflow-common: &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.0-python3.7}
  environment: &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ""
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
    AIRFLOW__CORE__LOAD_EXAMPLES: "false"
    AIRFLOW__API__AUTH_BACKEND: "airflow.api.auth.backend.basic_auth"
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-apache-spark}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
  depends_on: &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    ports:
      - 6379:6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test:
        [
          "CMD-SHELL",
          'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"',
        ]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    command: version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: "true"
      _AIRFLOW_WWW_USER_CREATE: "true"
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}

  flower:
    <<: *airflow-common
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  ######################################################
  # SPARK SERVICES
  ######################################################

  jupyterlab:
    image: andreper/jupyterlab:3.0.0-spark-3.0.0
    container_name: jupyterlab
    ports:
      - 8888:8888
      - 4040:4040
    volumes:
      - shared-workspace:/opt/workspace
  spark-master:
    image: andreper/spark-master:3.0.0
    container_name: spark-master
    ports:
      - 8081:8080
      - 7077:7077
    volumes:
      - shared-workspace:/opt/workspace
  spark-worker-1:
    image: andreper/spark-worker:3.0.0
    container_name: spark-worker-1
    environment:
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=512m
    ports:
      - 8082:8081
    volumes:
      - shared-workspace:/opt/workspace
    depends_on:
      - spark-master
  spark-worker-2:
    image: andreper/spark-worker:3.0.0
    container_name: spark-worker-2
    environment:
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=512m
    ports:
      - 8083:8081
    volumes:
      - shared-workspace:/opt/workspace
    depends_on:
      - spark-master

volumes:
  postgres-db-volume:
  shared-workspace:
    name: "jordi_airflow"
    driver: local
    driver_opts:
      type: "none"
      o: "bind"
      device: "/Users/jordicrespoguzman/Projects/custom_airflow_spark/spark_folder"

I am trying to run the following DAG:

from airflow import DAG
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.filesystem import FileSensor
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.email import EmailOperator

from datetime import datetime, timedelta
import csv
import requests
import json

default_args = {
    "owner": "airflow",
    "email_on_failure": False,
    "email_on_retry": False,
    "email": "[email protected]",
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}


def printar():
    print("success!")


with DAG(
    "forex_data_pipeline",
    start_date=datetime(2021, 1, 1),
    schedule_interval="@daily",
    default_args=default_args,
    catchup=False,
) as dag:

    downloading_rates = PythonOperator(task_id="test1", python_callable=printar)

    forex_processing = SparkSubmitOperator(
        task_id="spark1",
        application="/opt/airflow/dags/test.py",
        conn_id="spark_conn",
        verbose=False,
    )

    downloading_rates  >> forex_processing

But I see this error in the airflow ui:

Broken DAG: [/opt/airflow/dags/dag_spark.py] Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/opt/airflow/dags/dag_spark.py", line 7, in <module>
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
ModuleNotFoundError: No module named 'airflow.providers.apache'

I have specified to install additional requirements in the docker-compose file:

_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-apache-spark}

I am writing it wrong? how I should specify the additional requirements I want to install in airflow? can I pass a requirements.txt? if so, how I specify the path?

Intention answered 5/6, 2021 at 15:53 Comment(1)
I think this answer may help you.Responsiveness
E
11

Support for _PIP_ADDITIONAL_REQUIREMENTS environment variable has not been released yet. It is only supported by the developer/unreleased version of the docker image. It is planned that this feature will be available in Airflow 2.1.1. For more information, see: Adding extra requirements for build and runtime of the PROD image.

For the older version, you should build a new image and set this image in the docker-compose.yaml. To do this, you need to follow a few steps.

  1. Create a new Dockerfile with the following content:
    FROM apache/airflow:2.0.0
    RUN pip install --no-cache-dir apache-airflow-providers
    
  2. Build a new image:
    docker build . --tag my-company-airflow:2.0.0
    
  3. Set this image in docker-compose.yaml file:
    echo "AIRFLOW_IMAGE_NAME=my-company-airflow:2.0.0" >> .env
    

For more information, see: Official guide about running Airflow in docker-compose environment

In particular, I recommend this fragment which describes what to do as you need to install a new pip package.

ModuleNotFoundError: No module named 'XYZ'

The Docker Compose file uses the latest Airflow image (apache/airflow). If you need to install a new Python library or system library, you can customize and extend it.

I recommend you check out the guide about building Docker Image. This explains how to install even more complex dependencies.

I also recommend only using Docker-compose files from the official website and intended for a specific version. Docker-compose files from newer versions may not work with older versions of Airflow, because we are making many improvements to these files all the time to improve stability reliability, and user experience.

Excitant answered 5/6, 2021 at 22:58 Comment(1)
Shouldn't it be --no-cache-dir instead of --no-cache-user in the Dockerfile?Comfort
T
28

I used:

_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- package1 package2 package3 }

# Example
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- apache-airflow-providers-apache-spark}

# or:
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- apache-airflow-providers-oracle apache-airflow-providers-microsoft-mssql}

(note: add 1 space)

Then when airflow docker first run Python, It will auto pip install the requirements packages.

[Reference]

  • Running Airflow in Docker — Airflow Documentation
  • _PIP_ADDITIONAL_REQUIREMENTS
    • If not empty, airflow containers will attempt to install requirements specified in the variable. example: lxml==4.6.3 charset-normalizer==1.4.1. Available in Airflow image 2.1.1 and above.
Tucky answered 1/8, 2021 at 3:44 Comment(1)
This is very helpful, thank you for sharing your insights.Jowett
E
11

Support for _PIP_ADDITIONAL_REQUIREMENTS environment variable has not been released yet. It is only supported by the developer/unreleased version of the docker image. It is planned that this feature will be available in Airflow 2.1.1. For more information, see: Adding extra requirements for build and runtime of the PROD image.

For the older version, you should build a new image and set this image in the docker-compose.yaml. To do this, you need to follow a few steps.

  1. Create a new Dockerfile with the following content:
    FROM apache/airflow:2.0.0
    RUN pip install --no-cache-dir apache-airflow-providers
    
  2. Build a new image:
    docker build . --tag my-company-airflow:2.0.0
    
  3. Set this image in docker-compose.yaml file:
    echo "AIRFLOW_IMAGE_NAME=my-company-airflow:2.0.0" >> .env
    

For more information, see: Official guide about running Airflow in docker-compose environment

In particular, I recommend this fragment which describes what to do as you need to install a new pip package.

ModuleNotFoundError: No module named 'XYZ'

The Docker Compose file uses the latest Airflow image (apache/airflow). If you need to install a new Python library or system library, you can customize and extend it.

I recommend you check out the guide about building Docker Image. This explains how to install even more complex dependencies.

I also recommend only using Docker-compose files from the official website and intended for a specific version. Docker-compose files from newer versions may not work with older versions of Airflow, because we are making many improvements to these files all the time to improve stability reliability, and user experience.

Excitant answered 5/6, 2021 at 22:58 Comment(1)
Shouldn't it be --no-cache-dir instead of --no-cache-user in the Dockerfile?Comfort
M
1

this almost works for me but I get this when I run my docker-compose up:

 Container airflow-redis-1  Healthy
 Container airflow-airflow-init-1  service "airflow-init" didn't complete successfully: exit 1
 Container airflow-airflow-init-1  service "airflow-init" didn't complete successfully: exit 1
 Container airflow-airflow-init-1  service "airflow-init" didn't complete successfully: exit 1
 Container airflow-airflow-init-1  service "airflow-init" didn't complete successfully: exit 1
service "airflow-init" didn't complete successfully: exit 1

When i look in the container logs I see this:

!!!!!  Installing additional requirements: ' kafka kafka-python confluent_kafka ' !!!!!!!!!!!!

WARNING: This is a development/test feature only. NEVER use it in production!
         Instead, build a custom image as described in

         https://airflow.apache.org/docs/docker-stack/build.html

         Adding requirements at container startup is fragile and is done every time
         the container starts, so it is onlny useful for testing and trying out
         of adding dependencies.


You are running pip as root. Please use 'airflow' user to run pip!

See: https://airflow.apache.org/docs/docker-stack/build.html#adding-a-new-pypi-package
Minorite answered 13/3 at 13:34 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.