Why does format("kafka") fail with "Failed to find data source: kafka." (even with uber-jar)?
Asked Answered
S

8

28

I use HDP-2.6.3.0 with Spark2 package 2.2.0.

I'm trying to write a Kafka consumer, using the Structured Streaming API, but I'm getting the following error after submit the job to the cluster:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:553)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:198)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:90)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
at com.example.KafkaConsumer.main(KafkaConsumer.java:21)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:782)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22$anonfun$apply$14.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22$anonfun$apply$14.apply(DataSource.scala:537)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22.apply(DataSource.scala:537)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:537)
... 17 more

Following spark-submit command:

$SPARK_HOME/bin/spark-submit \
     ​--master yarn \
​     --deploy-mode client \
​​     --class com.example.KafkaConsumer \​
​     --executor-cores 2 \
​​     --executor-memory 512m \​           
     --driver-memory 512m \​           
     sample-kafka-consumer-0.0.1-SNAPSHOT.jar​

My java code:

package com.example;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class KafkaConsumer {

    public static void main(String[] args) {

        SparkSession spark = SparkSession
                  .builder()
                  .appName("kafkaConsumerApp")
                  .getOrCreate();

        Dataset<Row> ds = spark
                  .readStream()
                  .format("kafka")
                  .option("kafka.bootstrap.servers", "dog.mercadoanalitico.com.br:6667")
                  .option("subscribe", "my-topic")
                  .load();
    }
}

pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example</groupId>
  <artifactId>sample-kafka-consumer</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

    <dependencies>

        <!-- spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>


        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.1.0</version>
        </dependency>


    </dependencies>  


    <repositories>
        <repository>
            <id>local-maven-repo</id>
            <url>file:///${project.basedir}/local-maven-repo</url>
        </repository>
    </repositories> 

    <build>

        <!-- Include resources folder in the .jar -->
        <resources>
            <resource>
                <directory>${basedir}/src/main/resources</directory>
            </resource>
        </resources>

        <plugins>

            <!-- Plugin to compile the source. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>       

            <!-- Plugin to include all the dependencies in the .jar and set the main class. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <!-- This filter is to workaround the problem caused by included signed jars.
                                     java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
                                -->
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.example.KafkaConsumer</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>    
</project>

[UPDATE] UBER-JAR

Below the configuration used in the pom.xml to generate the uber-jar

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <!-- This filter is to workaround the problem caused by included signed jars.
                                     java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
                                -->
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.example.KafkaConsumer</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
Slayton answered 28/12, 2017 at 17:41 Comment(3)
It's been a while since I worked with maven (I'm with sbt), but maven-shade-plugin is for shading not uber-jar, isn't it? Shouldn't it be maven-assembly-plugin with jar-with-dependencies configuration?Proclitic
I'm not an expert on the Java world, but from my researches the maven-shade-plugin is the option used to generate the uber-jar file.Slayton
If you are using scala's build.sbt and discarding META-INF files blindly as part of your assemblyMergeStrategy while building the uber jar, that can cause the "kafka" alias to go unregistered. Check this SO answer: https://mcmap.net/q/496972/-why-does-spark-application-fail-with-classnotfoundexception-failed-to-find-data-source-kafka-as-uber-jar-with-sbt-assemblyLohman
P
45

kafka data source is an external module and is not available to Spark applications by default.

You have to define it as a dependency in your pom.xml (as you have done), but that's just the very first step to have it in your Spark application.

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>

With that dependency you have to decide whether you want to create a so-called uber-jar that would have all the dependencies bundled altogether (that results in a fairly big jar file and makes the submission time longer) or use --packages (or less flexible --jars) option to add the dependency at spark-submit time.

(There are other options like storing the required jars on Hadoop HDFS or using Hadoop distribution-specific ways of defining dependencies for Spark applications, but let's keep things simple)

I'd recommend using --packages first and only when it works consider the other options.

Use spark-submit --packages to include the spark-sql-kafka-0-10 module as follows.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

Include the other command-line options as you wish.

Uber-Jar Approach

Including all the dependencies in a so-called uber-jar may not always work due to how META-INF directories are handled.

For kafka data source to work (and other data sources in general) you have to ensure that META-INF/services/org.apache.spark.sql.sources.DataSourceRegister of all the data sources are merged (not replace or first or whatever strategy you use).

kafka data sources uses its own META-INF/services/org.apache.spark.sql.sources.DataSourceRegister that registers org.apache.spark.sql.kafka010.KafkaSourceProvider as the data source provider for kafka format.

Proclitic answered 29/12, 2017 at 7:19 Comment(3)
Also, I've just updated the question with the UBER-JAR configuration. If you see any improvement on it, please let me know.Slayton
I need to correct my answer, even with maven-assembly-plugin couldn't merge files under META-INF/services/ from different jars(dependencies). Though it provides a way to handle this situation with ContainerDescriptionHandler, but didn't work. But could do merge with maven shade plugin, @KleysonRios you only miss below in your maven settigs <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>Petaliferous
@wyx Can you ask a separate question to help you out with the error? Please post the error, how you bundle the deps and how you execute the app. Ok?Proclitic
E
6

The top answer is correct this solved the issue for me:

assemblyMergeStrategy in assembly := {
  case "reference.conf" => MergeStrategy.concat
  case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
  case PathList("META-INF", xs@_*) => MergeStrategy.discard
  case _ => MergeStrategy.first
}
Equate answered 20/8, 2020 at 7:48 Comment(1)
Welcome to Stack Overflow. Code-only answers are discouraged on Stack Overflow because they don't explain how it solves the problem. Please edit your answer to explain how this answers the question and what and how it improves on the existing answers, so that it is useful to users with similar issues.Perlie
F
3

Even I had similar issue, issue started when we upgraded the Cloudera-Spark version from 2.2 --> 2.3.

Issue was: my uber jar META-INF/serives/org.apache.spark.sql.sources.DataSourceRegister was getting overwritten by similar file which is present in some other jars. Hence it was not able to find the KafkaConsumer entry in 'DataSourceRegister' file.

Resolution: modifying the POM.xml helped me.

<configuration>
  <transformers>
        <transformer
             implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
             <resource>
                   META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
             </resource>
        </transformer>
   </transformers>
Femininity answered 10/9, 2018 at 5:25 Comment(0)
C
2

For uber-jar, adding ServicesResourceTransformer to shade-plugin works for me.

<transformers>
    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
Chervonets answered 26/8, 2019 at 10:23 Comment(1)
better than spark-submit --packages . it works nice for meAshlynashman
N
1

My solution was different, I directly specify spark-sql-kafka package on the submit-job command:

.\bin\spark-submit --master local --class "org.myspark.KafkaStream" --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0  <path_to_jar>

Related: http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#deploying

Neddie answered 10/7, 2020 at 21:12 Comment(0)
P
0

I faced the same error. It took me a couple of days to figure out. When you copy dependency from maven repository, in particular "spark-sql-kafka", it contains the line:

<scope> provided </scope> 

The solution was to remove this line so that the dependency would run in the default "compile" scope. The same thing is true if you use SBT. It's probably worthy to remove it for other dependencies as well if they have it, just in case.

Pilewort answered 29/11, 2020 at 7:50 Comment(0)
P
0

I had the sample problem, but with gradle and shadowJar. It worked after adding:

shadowJar {
    mergeServiceFiles()
}
assemble.dependsOn shadowJar

Ptolemaeus answered 10/8, 2021 at 6:31 Comment(0)
A
0

I faced the same error, because i exclude everything under META-INF in shade plugin for fixing the shade overlapping resource warning,

<exclude>META-INF/**</exclude>

but classLoader need resource to know which DataSource is registered. so remove this exclude, it's work fine to me.

 <resource>
      META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 </resource>

hope it could help someone.

Aristocracy answered 7/9, 2021 at 4:43 Comment(2)
Can you please share you POM structure. I am not sure where to add the <resource>Consuela
actually i had forgot that, seems you can refer @RajashekharC's answer.Aristocracy

© 2022 - 2024 — McMap. All rights reserved.