Running from a local IDE against a remote Spark cluster
Asked Answered
S

2

6

We have a kerberized cluster with Spark running on Yarn. At the moment, we write our Spark code in Scala locally, then build a fat JAR which we copy over to the cluster and then run spark-submit. I would instead like to write Spark code on my local PC and have it run against the cluster directly. Is there a straightforward way to do this? The Spark docs don't seem to have any such pattern.

FYI, my local machine is running Windows and the cluster is running CDH.

Starlike answered 14/2, 2017 at 19:18 Comment(0)
S
3

While cricket007's answer works for spark-submit, here is what I did to run against a remote cluster using IntelliJ:

First, make sure the JARs on the client and server sides are identical. Since we are using CDH 7.1, I made sure all my JARs came from the specific distribution.

Set HADOOP_CONF_DIR and YARN_CONF_DIR as described in cricket007's answer. Set "spark.yarn.principal" and "spark.yarn.keytab" as appropriate in the Spark conf.

If connecting to HDFS, make sure the following exclusion rule is set in build.sbt:

libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.6.0-cdh5.7.1" excludeAll ExclusionRule(organization = "javax.servlet")

Make sure the spark-launcher and spark-yarn JARs are listed on build.sbt.

libraryDependencies += "org.apache.spark" %% "spark-launcher" % "1.6.0-cdh5.7.1"

libraryDependencies += "org.apache.spark" %% "spark-yarn" % "1.6.0-cdh5.7.1"

Find the CDH JARs on the server and copy them to a known location on HDFS. Add the following lines to your code:

final val CDH_JAR_PATH = "/opt/cloudera/parcels/CDH/jars"

final val hadoopJars: Seq[String] = Seq[String](
"hadoop-annotations-2.6.0-cdh5.7.1.jar"
, "hadoop-ant-2.6.0-cdh5.7.1.jar"
, "hadoop-ant-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-archive-logs-2.6.0-cdh5.7.1.jar"
, "hadoop-archives-2.6.0-cdh5.7.1.jar"
, "hadoop-auth-2.6.0-cdh5.7.1.jar"
, "hadoop-aws-2.6.0-cdh5.7.1.jar"
, "hadoop-azure-2.6.0-cdh5.7.1.jar"
, "hadoop-capacity-scheduler-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-common-2.6.0-cdh5.7.1.jar"
, "hadoop-core-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-datajoin-2.6.0-cdh5.7.1.jar"
, "hadoop-distcp-2.6.0-cdh5.7.1.jar"
, "hadoop-examples-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-examples.jar"
, "hadoop-extras-2.6.0-cdh5.7.1.jar"
, "hadoop-fairscheduler-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-gridmix-2.6.0-cdh5.7.1.jar"
, "hadoop-gridmix-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-hdfs-2.6.0-cdh5.7.1.jar"
, "hadoop-hdfs-nfs-2.6.0-cdh5.7.1.jar"
, "hadoop-kms-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-app-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-common-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-core-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-hs-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-hs-plugins-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-jobclient-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-nativetask-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-shuffle-2.6.0-cdh5.7.1.jar"
, "hadoop-nfs-2.6.0-cdh5.7.1.jar"
, "hadoop-openstack-2.6.0-cdh5.7.1.jar"
, "hadoop-rumen-2.6.0-cdh5.7.1.jar"
, "hadoop-sls-2.6.0-cdh5.7.1.jar"
, "hadoop-streaming-2.6.0-cdh5.7.1.jar"
, "hadoop-streaming-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-tools-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-yarn-api-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-applications-distributedshell-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-applications-unmanaged-am-launcher-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-client-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-common-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-registry-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-server-applicationhistoryservice-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-server-common-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-server-nodemanager-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-server-resourcemanager-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-server-web-proxy-2.6.0-cdh5.7.1.jar"
, "hbase-hadoop2-compat-1.2.0-cdh5.7.1.jar"
, "hbase-hadoop-compat-1.2.0-cdh5.7.1.jar")

final val sparkJars: Seq[String] = Seq[String](
"spark-1.6.0-cdh5.7.1-yarn-shuffle.jar",
"spark-assembly-1.6.0-cdh5.7.1-hadoop2.6.0-cdh5.7.1.jar",
"spark-avro_2.10-1.1.0-cdh5.7.1.jar",
"spark-bagel_2.10-1.6.0-cdh5.7.1.jar",
"spark-catalyst_2.10-1.6.0-cdh5.7.1.jar",
"spark-core_2.10-1.6.0-cdh5.7.1.jar",
"spark-examples-1.6.0-cdh5.7.1-hadoop2.6.0-cdh5.7.1.jar",
"spark-graphx_2.10-1.6.0-cdh5.7.1.jar",
"spark-hive_2.10-1.6.0-cdh5.7.1.jar",
"spark-launcher_2.10-1.6.0-cdh5.7.1.jar",
"spark-mllib_2.10-1.6.0-cdh5.7.1.jar",
"spark-network-common_2.10-1.6.0-cdh5.7.1.jar",
"spark-network-shuffle_2.10-1.6.0-cdh5.7.1.jar",
"spark-repl_2.10-1.6.0-cdh5.7.1.jar",
"spark-sql_2.10-1.6.0-cdh5.7.1.jar",
"spark-streaming-flume-sink_2.10-1.6.0-cdh5.7.1.jar",
"spark-streaming-flume_2.10-1.6.0-cdh5.7.1.jar",
"spark-streaming-kafka_2.10-1.6.0-cdh5.7.1.jar",
"spark-streaming_2.10-1.6.0-cdh5.7.1.jar",
"spark-unsafe_2.10-1.6.0-cdh5.7.1.jar",
"spark-yarn_2.10-1.6.0-cdh5.7.1.jar")

def getClassPath(jarNames: Seq[String], pathPrefix: String): String = {
jarNames.foldLeft("")((cp, name) => s"$cp:$pathPrefix/$name").drop(1)

}

Add these lines when creating a SparkConf:

.set("spark.driver.extraClassPath", getClassPath(sparkJars ++ hadoopJars, CDH_JAR_PATH))
.set("spark.executor.extraClassPath", getClassPath(sparkJars ++ hadoopJars, CDH_JAR_PATH))
.set("spark.yarn.jars", "hdfs://$YOUR_MACHINE/PATH_TO_JARS/*")

Your program should work now.

Starlike answered 21/2, 2017 at 23:51 Comment(2)
Note: You could probably programmatically generate that JAR file listingBolshevik
Also, you can set the --jars param from the "Edit Configurations" in IntelliJBolshevik
B
1

Assuming you have the correct packages on your classpath (easiest setup by SBT, Maven, etc.), you should be able to spark-submit from anywhere. The --master flag is the main piece that really determines how the job is distributed. One thing to take into consideration is if your local machine is not blocked off from the YARN cluster via a firewall or other network prevention, for example. (Because you'd don't want people randomly running applications on your cluster)

From your local machine, you'll need the Hadoop configuration files from your cluster and setup $SPARK_HOME/conf directory to accommodate for some Hadoop related settings.

From Spark on YARN page.

Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the Hadoop cluster. These configs are used to write to HDFS and connect to the YARN ResourceManager. The configuration contained in this directory will be distributed to the YARN cluster so that all containers used by the application use the same configuration

These values are set from $SPARK_HOME/conf/spark-env.sh

Since you are Kerberized, see Long Running Spark Applciations

For long-running applications, such as Spark Streaming jobs, to write to HDFS, you must configure Kerberos authentication for Spark for Spark, and pass the Spark principal and keytab to the spark-submit script using the --principal and --keytab parameters

Bolshevik answered 14/2, 2017 at 19:22 Comment(7)
Is it possible to run locally without using spark-submit? Ideally, I would like to write some spark code in IntelliJ and have it run on the cluster without any intermediate steps.Starlike
In theory, I guess you could SparkConf.setMaster("yarn"), for example, but for production apps, that isn't recommendedBolshevik
I did try that and pretty much set everything as you described. However, now I get a 'java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.YarnSparkHadoopUtil' exception which leads me to believe spark-submit comes with several JARs that I am missing.Starlike
Sounds to me like you didn't submit the fat jarBolshevik
I would like to avoid building a fat JAR if I can. My Scala code does non need any non-Spark libraries. I was able to get all local exceptions resolved by adding the spark-launcher and spark-yarn JARs to my classpath. However, now the Yarn application dies on the server side with this exception: 'Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/Logging' This is odd because I'm using the CDH 5.7.1 JARs on both the server and client side but 'org.apache.spark.deploy.yarn.ExecutorLauncher' is looking for a class that doesn't exist on Spark 1.6 and above.Starlike
Finally have it working. Added instructions as an edit.Starlike
You should rather post that as your own answer rather than an edit to the questionBolshevik

© 2022 - 2024 — McMap. All rights reserved.