Why does Spark application fail with “ClassNotFoundException: Failed to find data source: kafka” as uber-jar with sbt assembly?
Asked Answered
E

8

27

I'm trying to run a sample like StructuredKafkaWordCount. I started with the Spark Structured Streaming Programming guide.

My code is

package io.boontadata.spark.job1

import org.apache.spark.sql.SparkSession

object DirectKafkaAggregateEvents {
  val FIELD_MESSAGE_ID = 0
  val FIELD_DEVICE_ID = 1
  val FIELD_TIMESTAMP = 2
  val FIELD_CATEGORY = 3
  val FIELD_MEASURE1 = 4
  val FIELD_MEASURE2 = 5

  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println(s"""
        |Usage: DirectKafkaAggregateEvents <brokers> <subscribeType> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <subscribeType> sample value: subscribe
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }

    val Array(bootstrapServers, subscribeType, topics) = args

    val spark = SparkSession
      .builder
      .appName("boontadata-spark-job1")
      .getOrCreate()

    import spark.implicits._

    // Create DataSet representing the stream of input lines from kafka
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option(subscribeType, topics)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]

    // Generate running word count
    val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

    // Start running the query that prints the running counts to the console
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()
  }

}

I added the following sbt files:

build.sbt:

name := "boontadata-spark-job1"
version := "0.1"
scalaVersion := "2.11.7"

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.1.1"
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.1.1"

// META-INF discarding
assemblyMergeStrategy in assembly := { 
   {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
   }
}

I also added project/assembly.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

This creates a Uber jar with the non provided jars.

I submit with the following line:

spark-submit boontadata-spark-job1-assembly-0.1.jar ks1:9092,ks2:9092,ks3:9092 subscribe sampletopic

but I get this runtime error:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
        at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80)
        at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
        at io.boontadata.spark.job1.DirectKafkaAggregateEvents$.main(StreamingJob.scala:41)
        at io.boontadata.spark.job1.DirectKafkaAggregateEvents.main(StreamingJob.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
        at scala.util.Try.orElse(Try.scala:84)
        at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:132)
        ... 18 more
16/12/23 13:32:48 INFO spark.SparkContext: Invoking stop() from shutdown hook

Is there a way to know which class is not found so that I can search the maven.org repo for that class.

The lookupDataSource source code seems to be at line 543 at https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala but I couldn't find a direct link with Kafka data source...

Complete source code is here: https://github.com/boontadata/boontadata-streams/tree/ad0d0134ddb7664d359c8dca40f1d16ddd94053f

Encomiast answered 23/12, 2016 at 14:13 Comment(2)
complete source code is here: <github.com/boontadata/boontadata-streams/tree/…>Encomiast
Hi @jithinpt, please see comments in the answer marked as THE answer.Encomiast
L
21

I tried like this it's working for me. Submit like this and let me know once you have any issues

./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 --class com.inndata.StructuredStreaming.Kafka --master local[*] /Users/apple/.m2/repository/com/inndata/StructuredStreaming/0.0.1SNAPSHOT/StructuredStreaming-0.0.1-SNAPSHOT.jar
Lowe answered 6/1, 2017 at 6:7 Comment(3)
Hi, the weird thing is in benjguin's sbt "libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2"" has already been there. Why you need to manually add "--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0"? I know it works, but I just don't know why it does not work without that.Fairground
I'd call it a workaround and does not explain why the uber-jar does not work. The root cause is assemblyMergeStrategy in build.sbt that discards all META-INF files incl. registrations. See my answer below.Histogram
The docs aren't clear on which dependencies should be marked provided. Seems spark-sql-kafka is not one of them. Remove the provided tag from pom.xml/build.sbt and it worksAuster
H
39

The issue is the following section in build.sbt:

// META-INF discarding
assemblyMergeStrategy in assembly := { 
   {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
   }
}

It says that all META-INF entires should be discarded, including the "code" that makes data source aliases (e.g. kafka) work.

But the META-INF files are very important for kafka (and other aliases of streaming data sources) to work.

For kafka alias to work Spark SQL uses META-INF/services/org.apache.spark.sql.sources.DataSourceRegister with the following entry:

org.apache.spark.sql.kafka010.KafkaSourceProvider

KafkaSourceProvider is responsible to register kafka alias with the proper streaming data source, i.e. KafkaSource.

Just to check that the real code is indeed available, but the "code" that makes the alias registered is not, you could use the kafka data source by the fully-qualified name (not the alias) as follows:

spark.readStream.
  format("org.apache.spark.sql.kafka010.KafkaSourceProvider").
  load

You will see other problems due to missing options like kafka.bootstrap.servers, but...we're digressing.

A solution is to MergeStrategy.concat all META-INF/services/org.apache.spark.sql.sources.DataSourceRegister (that would create an uber-jar with all data sources, incl. the kafka data source).

case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
Histogram answered 2/1, 2018 at 13:16 Comment(5)
Actually sounds like I managed to make it work with your suggestion. Just had to order the "case" in the merge strategy and put that at the top.Fact
Should I still retain the case PathList("META-INF" , ...) after applying case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat?Anabiosis
They're different patterns so it depends on your app actually.Histogram
I was able to run just by replacing kafka by org.apache.spark.sql.kafka010.KafkaSourceProvider along with making an uber jar with maven shade plugin . Thanks a lot @JacekLaskowskiLi
Great! You don't need "replacing kafka by org.apache.spark.sql.kafka010.KafkaSourceProvider". Switch it back.Histogram
L
21

I tried like this it's working for me. Submit like this and let me know once you have any issues

./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 --class com.inndata.StructuredStreaming.Kafka --master local[*] /Users/apple/.m2/repository/com/inndata/StructuredStreaming/0.0.1SNAPSHOT/StructuredStreaming-0.0.1-SNAPSHOT.jar
Lowe answered 6/1, 2017 at 6:7 Comment(3)
Hi, the weird thing is in benjguin's sbt "libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2"" has already been there. Why you need to manually add "--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0"? I know it works, but I just don't know why it does not work without that.Fairground
I'd call it a workaround and does not explain why the uber-jar does not work. The root cause is assemblyMergeStrategy in build.sbt that discards all META-INF files incl. registrations. See my answer below.Histogram
The docs aren't clear on which dependencies should be marked provided. Seems spark-sql-kafka is not one of them. Remove the provided tag from pom.xml/build.sbt and it worksAuster
A
4

This is in view of Jacek Laskowski's answer.

Those of you building your project on maven can try this out. Add the line mentioned below to your maven-shade-plugin.

META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

I've put down the plugin code for the pom file as an example to show where to add the line.


<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>3.1.0</version>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>
                            META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
                        </resource>
                    </transformer>
                </transformers>
                <finalName>${project.artifactId}-${project.version}-uber</finalName>
            </configuration>
        </execution>
    </executions>
</plugin>

Please excuse my formatting skills.

Abomination answered 8/5, 2018 at 16:15 Comment(0)
S
3

In my case I also got this error while compiling with sbt, and the cause was that sbt assembly was not including the spark-sql-kafka-0-10_2.11 artifact as part of the fat jar.

(I would be very welcome to comments here. The dependency was not specified a scope, so it should not be assumed to be "provided").

So I changed to deploying a normal (slim) jar and including the dependencies with the --jars parameters to spark-submit.

In order to gather all dependencies in one place, you can add retrieveManaged := true to your sbt project settings, or you can, in the sbt console, issue:

> set retrieveManaged := true
> package

That should bring all dependencies to the lib_managed folder.

Then you can copy all those files (with a bash command you can for example use something like this

cd /path/to/your/project

JARLIST=$(find lib_managed -name '*.jar'| paste -sd , -)

spark-submit [other-args] target/your-app-1.0-SNAPSHOT.jar --jars "$JARLIST"
Sherleysherline answered 5/1, 2017 at 12:38 Comment(1)
Yes, this happened on Spark 2.1.Sherleysherline
W
1

I'm using spark 2.1 and facing the very same problem my workaround is

1) spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

2) cd ~/.ivy2/jars here you are ,all the needed jars are in this folder now

3) copy all the jars in this folder to all the nodes(can create a specific folder holding them)

4) add the folder name to spark.driver.extraClassPath and spark.driver.extraClassPath ,e.g. spark.driver.extraClassPath=/opt/jars/*:your_other_jars

5 spark-submit --class ClassNm --Other-Options YourJar.jar works fine now

Wringer answered 8/10, 2017 at 15:33 Comment(0)
C
1

I am using gradle as a build tool and the shadowJar plugin to create the uberJar. The solution was simply to add a File

src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister  

to the project.

In this file you need to put, line by line, the class names of the DataSources you use, in this case it would be org.apache.spark.sql.kafka010.KafkaSourceProvider (find that class name for example here)

The reason is that Spark uses the java ServiceLoader in it's internal dependency management mechanisms.

Full example here.

Cairo answered 24/2, 2019 at 11:5 Comment(0)
C
0

I solved it by downloading the jar file to the driver system. From there, I supplied the jar to spark submit with --jar option.

Also to be noted is that i was packaging the whole spark 2.1 environment in my uber jar (since my cluster is still on 1.6.1) For some reason, its not picked up when included in uber jar.

spark-submit --jar /ur/path/spark-sql-kafka-0-10_2.11:2.1.0 --class ClassNm --Other-Options YourJar.jar

Collin answered 12/5, 2017 at 19:47 Comment(0)
P
0

Although this is an old thread, I faced this issue with Pyspark 2.3.3 on Hortonworks 3.1.5 so I thought maybe it can help others. The following jars are required for spark streaming integration with Kafka 2.

Note: Please download appropriate jars according to Spark & Kafka's version.

Primateship answered 23/6, 2020 at 12:2 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.