I am using the following docker-compose image, I got this image from: https://github.com/apache/airflow/blob/main/docs/apache-airflow/start/docker-compose.yaml
version: "3"
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.0-python3.7}
environment: &airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ""
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__API__AUTH_BACKEND: "airflow.api.auth.backend.basic_auth"
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-apache-spark}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
depends_on: &airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test:
[
"CMD-SHELL",
'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"',
]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
command: version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: "true"
_AIRFLOW_WWW_USER_CREATE: "true"
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
flower:
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
######################################################
# SPARK SERVICES
######################################################
jupyterlab:
image: andreper/jupyterlab:3.0.0-spark-3.0.0
container_name: jupyterlab
ports:
- 8888:8888
- 4040:4040
volumes:
- shared-workspace:/opt/workspace
spark-master:
image: andreper/spark-master:3.0.0
container_name: spark-master
ports:
- 8081:8080
- 7077:7077
volumes:
- shared-workspace:/opt/workspace
spark-worker-1:
image: andreper/spark-worker:3.0.0
container_name: spark-worker-1
environment:
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=512m
ports:
- 8082:8081
volumes:
- shared-workspace:/opt/workspace
depends_on:
- spark-master
spark-worker-2:
image: andreper/spark-worker:3.0.0
container_name: spark-worker-2
environment:
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=512m
ports:
- 8083:8081
volumes:
- shared-workspace:/opt/workspace
depends_on:
- spark-master
volumes:
postgres-db-volume:
shared-workspace:
name: "jordi_airflow"
driver: local
driver_opts:
type: "none"
o: "bind"
device: "/Users/jordicrespoguzman/Projects/custom_airflow_spark/spark_folder"
I am trying to run the following DAG:
from airflow import DAG
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.filesystem import FileSensor
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta
import csv
import requests
import json
default_args = {
"owner": "airflow",
"email_on_failure": False,
"email_on_retry": False,
"email": "[email protected]",
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
def printar():
print("success!")
with DAG(
"forex_data_pipeline",
start_date=datetime(2021, 1, 1),
schedule_interval="@daily",
default_args=default_args,
catchup=False,
) as dag:
downloading_rates = PythonOperator(task_id="test1", python_callable=printar)
forex_processing = SparkSubmitOperator(
task_id="spark1",
application="/opt/airflow/dags/test.py",
conn_id="spark_conn",
verbose=False,
)
downloading_rates >> forex_processing
But I see this error in the airflow ui:
Broken DAG: [/opt/airflow/dags/dag_spark.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/opt/airflow/dags/dag_spark.py", line 7, in <module>
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
ModuleNotFoundError: No module named 'airflow.providers.apache'
I have specified to install additional requirements in the docker-compose file:
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-apache-spark}
I am writing it wrong? how I should specify the additional requirements I want to install in airflow? can I pass a requirements.txt? if so, how I specify the path?