Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
Asked Answered
F

3

7

Unable to send avro format message to Kafka topic from spark streaming application. Very less information is available online about avro spark streaming example code. "to_avro" method doesn't require avro schema then how it will encode to avro format?

Can someone please help to resolve below exception?

Dependency:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-avro_2.12</artifactId>
    <version>2.4.4</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.4.0</version>
</dependency>

Below is the code to push to kafka topic

dataset.toDF.select(to_avro(struct(dataset.toDF.columns.map(column):_*))).alias("value").distinct.write.format("avro")
      .option(KafkaConstants.BOOTSTRAP_SERVER, priBootStrapServers)
      .option(ApplicationConstants.TOPIC_KEY, publishPriTopic)
      .save()

Getting below exception.

Caused by: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
    at java.util.ServiceLoader.fail(ServiceLoader.java:232)
    at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
    at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
    at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:614)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:241)
    at com.walmart.replenishment.edf.dao.EdfOwBuzzerDao$.saveToCassandra(EdfOwBuzzerDao.scala:47)
    at com.walmart.replenishment.edf.process.BuzzerService$.updateScrItemPriStatus(BuzzerService.scala:119)
    at com.walmart.replenishment.edf.process.BuzzerStreamProcessor$$anonfun$processConsumerInputStream$1.apply(BuzzerStreamProcessor.scala:36)
    at com.walmart.replenishment.edf.process.BuzzerStreamProcessor$$anonfun$processConsumerInputStream$1.apply(BuzzerStreamProcessor.scala:28)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.FileFormat.$init$(Lorg/apache/spark/sql/execution/datasources/FileFormat;)V
    at org.apache.spark.sql.avro.AvroFileFormat.(AvroFileFormat.scala:44)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.lang.Class.newInstance(Class.java:442)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
    ```


Fryer answered 26/12, 2019 at 10:37 Comment(3)
You are using spark-avro version 2.4.4 while spark version is 2.4.0. Did you tried to use the same version?Ammadis
@Ammadis Yes, tried with spark-avro version 2.4.0 as well, still issue is same.Fryer
Anyone can help?Fryer
O
9

Look at this this ticket. The problem seems to exist in 2.4.4 and 2.4.5. I am still using version 2.4.4. Switching to package org.apache.spark:spark-avro_2.11:2.4.4 fixed the issue for me.

Olibanum answered 31/3, 2020 at 16:22 Comment(3)
I had the same issue, I am using Spark2.4.5, This solution works for me as well by switching package to org.apache.spark:spark-avro_2.11:2.4.5Preposterous
Thanks for confirming!Olibanum
I confirm on a different package as well: I was using com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.18.1 in Spark 3 and when changed to Spark 2.4 I was receiving a similar error message. I downgraded to com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.18.1 and it worked for me!Indocile
H
4

The scala version of spark-avro_2.12 should be Consistent with spark-core version.

You can use spark-submit --packages org.apache.spark:spark-avro_2.12:2.4.4 ..., Or spark-submit --jars "spark-avro_2.11-2.4.4.jar".

In a word, when you use databricks avro, you also should use apache avro jars.

reference to "https://spark.apache.org/docs/latest/sql-data-sources-avro.html#deploying"

Hamadryad answered 7/1, 2020 at 16:37 Comment(0)
D
1

I had to use 2.12:2.4.5 (org.apache.spark:spark-avro_2.12:2.4.5) for my dataproc cluster with 1.5 image (spark version: 2.4).

no other version (2.11:2.4.5 / 2.11:2.4.4) worked.

Drear answered 1/7, 2020 at 7:5 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.