Scheduling Spark Jobs Running on Kubernetes via Airflow
Asked Answered
M

2

8

I have a spark job that runs via a Kubernetes pod . Till now I was using an Yaml file to run my jobs manually. Now , I want to schedule my spark jobs via airflow. This is the first time I am using airflow and I am unable to figure out how I can add my Yaml file in the airflow. From what I have read is that I can schedule my jobs via a DAG in Airflow. A dag example is this :

from airflow.operators import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {'owner':'test', 'start_date' : datetime(2019, 4, 3), 'retries': 2, 'retry_delay': timedelta(minutes=1) }
dag = DAG('test_dag', default_args = args, catchup=False)

def print_text1():
    print("hell-world1")

def print_text():
    print('Hello-World2')

t1 = PythonOperator(task_id='multitask1', python_callable=print_text1, dag=dag)
t2 = PythonOperator(task_id='multitask2', python_callable=print_text, dag=dag)
t1 >> t2

In this case the above methods will get executed on after the other once I play the DAG. Now , in case I want to run a spark submit job , what should I do? I am using Spark 2.4.4

Meador answered 24/10, 2019 at 9:28 Comment(0)
S
7

Airflow has a concept of operators, which represent Airflow tasks. In your example PythonOperator is used, which simply executes Python code and most probably not the one you are interested in, unless you submit Spark job within Python code. There are several operators that you can take use of:

  • BashOperator, which executes the given bash script for you. You may run kubectl or spark-submit using it directly
  • SparkSubmitOperator, the specific operator to call spark-submit
  • KubernetesPodOperator, creates Kubernetes pod for you, you can launch your Driver pod directly using it
  • Hybrid solutions, eg. HttpOperator + Livy on Kubernetes, you spin up Livy server on Kubernetes, which serves as a Spark Job Server and provides REST API to be called by Airflow HttpOperator

Note: for each of the operators you need to ensure that your Airflow environment contains all the required dependencies for execution as well as the credentials configured to access the required services.

Also you can refer the existing thread:

Stumpy answered 31/10, 2019 at 11:24 Comment(2)
How to submit Kubernetes Job?Sst
WDYM @Sst exactly?Stumpy
A
2

As of 2023, we have new option to run spark job on kubernetes using "SparkKubernetesOperator" Refer : https://github.com/GoogleCloudPlatform/spark-on-k8s-operator

In airflow we can use "SparkKubernetesOperator" and provide spark job details in ".yaml" file. YAML file will create driver and executor pod to run spark job

Airflow Task:

spark_operator = SparkKubernetesOperator(
    dag=spark_operator_test,
    task_id="spark_operator_task",
    application_file='spark_app_config.yaml',
    namespace="spark-apps",
    on_success_callback=spark_success_alert,
    on_failure_callback=spark_fail_alert
)

Sample YAML file :

# # Copyright 2017 Google LLC # # Licensed under the Apache License, Version 2.0 

(the "License");
# you may not use this file except in compliance with the License. # You may obtain a copy of the License at
# # https://www.apache.org/licenses/LICENSE-2.0
# # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-pi-1
  namespace: spark-apps
spec:
  type: Scala
  mode: cluster
  image: "apache/spark-py:v3.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.4.0.jar"
  sparkVersion: "3.4.0"
  sparkConf:
    "spark.eventLog.enabled": "true"
    "spark.eventLog.dir": "/mnt/data/eventLogs"
    "spark.driver.log.persistToDfs.enabled": "true"
    "spark.driver.log.dfsDir": "/mnt/data/eventLogs"
  restartPolicy:
    type: Never
  volumes:
    - name: "data-volume"
      persistentVolumeClaim:
        claimName: pvc-bifrost-spark-data
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.4.0
    serviceAccount: spark-apps-spark
    # podSecurityContext:
      # fsGroup: 0
    volumeMounts:
      - name: "data-volume"
        mountPath: "/mnt/data"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 3.4.0
    deleteOnTermination: false
    # podSecurityContext:
      # fsGroup: 0
    volumeMounts:
      - name: "data-volume"
        mountPath: "/mnt/data"
Axle answered 24/4, 2023 at 9:51 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.