Reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)
Asked Answered



I have a spark 2.0 application that reads messages from kafka using spark streaming (with spark-streaming-kafka-0-10_2.11).

Structured streaming looks really cool so I wanted to try and migrate the code but I can't figure out how to use it.

in the regular streaming I used kafkaUtils to createDstrean and in the parameters I passed it was the value deserializer.

in the Structured streaming the doc says that I should deserialize using DataFrame functions but I can't figure exactly what that means.

I looked at examples such as this example but my Avro object in Kafka is quit complex and cannot be simply casted like the String in the example..

So far I tried this kind of code (which I saw here in a different question):

import spark.implicits._

  val ds1 = spark.readStream.format("kafka").

  val ds2 =$"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()  
  val query = ds2.writeStream

and I get "data type mismatch: cannot cast BinaryType to StructType(StructField(...."

how can I deserialize the value?

Vltava answered 20/11, 2016 at 15:40 Comment(2)
Did someone find a working solution? None of the below are working for me!Tigges
This library supports structured streams with Avro as a payload and may help: ABRiS (Avro Bridge for Spark). It still is under development but supports your use case. DISCLOSURE: I work for ABSA and I am the main developer behind this library.School

As noted above, as of Spark 2.1.0 there is support for avro with the batch reader but not with SparkSession.readStream(). Here is how I got it to work in Scala based on the other responses. I've simplified the schema for brevity.

package com.sevone.sparkscala.mypackage

import org.apache.spark.sql._
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}

object MyMain {

    // Create avro schema and reader
    case class KafkaMessage (
        deviceId: Int,
        deviceName: String
    val schemaString = """{
        "fields": [
            { "name":  "deviceId",      "type": "int"},
            { "name":  "deviceName",    "type": "string"},
        "name": "kafkamsg",
        "type": "record"
    val messageSchema = new Schema.Parser().parse(schemaString)
    val reader = new GenericDatumReader[GenericRecord](messageSchema)
    // Factory to deserialize binary avro data
    val avroDecoderFactory = DecoderFactory.get()
    // Register implicit encoder for map operation
    implicit val encoder: Encoder[GenericRecord] = org.apache.spark.sql.Encoders.kryo[GenericRecord]

    def main(args: Array[String]) {

        val KafkaBroker =  args(0);
        val InTopic = args(1);
        val OutTopic = args(2);

        // Get Spark session
        val session = SparkSession

        // Load streaming data
        import session.implicits._
        val data = session
                .option("kafka.bootstrap.servers", KafkaBroker)
                .option("subscribe", InTopic)
                .map(d => {
                    val rec =, avroDecoderFactory.binaryDecoder(d, null))
                    val deviceId = rec.get("deviceId").asInstanceOf[Int]
                    val deviceName = rec.get("deviceName").asInstanceOf[org.apache.avro.util.Utf8].toString
                    new KafkaMessage(deviceId, deviceName)
Stereograph answered 10/5, 2017 at 8:55 Comment(3)
it didnt worked for me ,Caused by: errorPtyalism
This solution doesn't worked for schema registry enabled kafka. It reported "Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -13"Mendymene
Make sure the avro schema used here matches word to word with the schema from kafka.Experimentalize

I'm not yet super familiar how Spark's serialization works in combination with the new/experimental Structured Streaming, but the approach below does work -- though I am not sure if it's the best way (IMHO the approach has a somewhat awkward look 'n feel).

I'll try to answer your question at the example of a custom data type (here: a Foo case class) instead of specifically Avro, but I hope it'll help you anyways. The idea is to use Kryo serialization to serialize/deserialize your custom type, see Tuning: Data serialization in the Spark documentation.

Note: Spark supports serialization of case classes out of the box via built-in (implicit) encoders that you can import via import spark.implicits._. But let's ignore this functionality for the sake of this example.

Imagine you have defined the following Foo case class as your custom type (TL;DR hint: to prevent running into weird Spark serialization complaints/errors you should put the code into a separate Foo.scala file):

// This could also be your auto-generated Avro class/type
case class Foo(s: String)

Now you have the following Structured Streaming code to read data from Kafka, where the input topic contains Kafka messages whose message value is a binary-encoded String, and your goal is it to create Foo instances based on these message values (i.e. similar to how you'd deserialize binary data into instances of an Avro class):

val messages: DataFrame = spark.readStream
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
    .option("subscribe", "my-input-topic")

Now we are deserializing the values into instances of our custom Foo type, for which we first need to define an implicit Encoder[Foo]:

implicit val myFooEncoder: Encoder[Foo] = org.apache.spark.sql.Encoders.kryo[Foo]
val foos: Dataset[Foo] = => Foo(new String(row.getAs[Array[Byte]]("value")))

Going back to your Avro question, what you'd need to do is to:

  1. Create a proper Encoder for your needs.
  2. Replace Foo(new String(row.getAs[Array[Byte]]("value")) with the code to deserialize your binary-encoded Avro data into Avro POJOs, i.e. code that takes your binary-encoded Avro data out of the message value (row.getAs[Array[Byte]]("value")) and returns, say, an Avro GenericRecord or whatever SpecificCustomAvroObject you have defined elsewhere.

If someone else knows of a more concise/better/... way to answer Tal's question, I'm all ears. :-)

See also:

Gumma answered 21/11, 2016 at 10:23 Comment(4)
I think Tal's use case is that he does not have binary encoded String on his topic, he has binary encoded avro. Would using bijection-avro work in this case?Varus
yep @Varus that's right.. I had to backlog this project a little so I havn't had a chance to try anything new.. hopefully I'll look into this subject soon. when I'll do I'll look into bijection-avroVltava
@TalJoffe please let me know what you come up with. I'm trying to read avro that was put there by a kstream and bijection-avro didn't work out for me.Varus
@Varus oh o.k. as I said it might take me a while but once I figure it out I'll right hereVltava

So actually someone in my firm solved this for me So I'll post it here for future readers..

basically what I missed on top of what miguno suggested is the decode part:

def decodeMessages(iter: Iterator[KafkaMessage], schemaRegistryUrl: String) : Iterator[<YourObject>] = {
val decoder = AvroTo<YourObject>Decoder.getDecoder(schemaRegistryUrl) => {
  val record = decoder.fromBytes(message.value).asInstanceOf[GenericData.Record]
  val field1 = record.get("field1Name").asInstanceOf[GenericData.Record]
  val field2 = record.get("field1Name").asInstanceOf[GenericData.String]
  //create an object with the fields extracted from genericRecord

now you can read messages from kafka and decode them like so:

val ds = spark
  .option("kafka.bootstrap.servers", config.getString(ConfigUtil.kafkaBootstrapServers))
  .option("subscribe", config.getString(ConfigUtil.subscribeTopic))

val decodedDs  = ds.mapPartitions(decodeMessages(_, schemaRegistryUrl))

*KafkaMessage is simply a case class that contains the generic object you get when reading from Kafka (key,value,topic,partition,offset,timestamp)

AvroTo<YourObject>Decoder is some class that'll decode your object given a schema registry url.

For example using Confluent's KafkaAvroDeserializer and schema registry.

val kafkaProps = Map("schema.registry.url" -> schemaRegistryUrl)
val client = new CachedSchemaRegistryClient(schemaRegistryUrl, 20)

// If you have Avro encoded keys
val keyDeserializer = new KafkaAvroDeserializer(client)
keyDeserializer.configure(kafkaProps.asJava, true) //isKey = true

// Avro encoded values
valueDeserializer = new KafkaAvroDeserializer(client)
valueDeserializer.configure(kafkaProps.asJava, false) //isKey = false

From these, call .deserialize(topicName, bytes).asInstanceOf[GenericRecord] to get an avro object.

Hope this helps someone

Vltava answered 14/12, 2016 at 15:53 Comment(1)
So do you mean we need provide related case class besides avro generated class? Could you show us your import statement? Where to get the "Deocder" class in this statement? val decoder = AvroTo<YourObject>Decoder.getDecoder(schemaRegistryUrl)Mendymene

Use the following steps:

  • Define a Kafka message.
  • Define a consumer Utility, which returns a DataSet of YourAvroObject.
  • Define your logical code.

Kafka Message:

case class KafkaMessage(key: String, value: Array[Byte],
                                    topic: String, partition: String, offset: Long, timestamp: Timestamp)

Kafka Consumer:

import java.util.Collections

import com.typesafe.config.{Config, ConfigFactory}
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.SparkSession

import scala.reflect.runtime.universe._

object KafkaAvroConsumer {

  private val conf: Config = ConfigFactory.load().getConfig("kafka.consumer")
  val valueDeserializer = new KafkaAvroDeserializer()
    conf.getString("schema.registry.url")), false)

  def transform[T <: GenericRecord : TypeTag](msg: KafkaMessage, schemaStr: String) = {
    val schema = new Schema.Parser().parse(schemaStr)
    Utils.convert[T](schema)(valueDeserializer.deserialize(msg.topic, msg.value))

  def createDataStream[T <: GenericRecord with Product with Serializable : TypeTag]
  (schemaStr: String)
  (subscribeType: String, topics: String, appName: String, startingOffsets: String = "latest") = {

    val spark = SparkSession

    import spark.implicits._

    // Create DataSet representing the stream of KafkaMessage from kafka
    val ds = spark
      .option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
      .option(subscribeType, topics)
      .option("startingOffsets", "earliest")
      .map(msg => KafkaAvroConsumer.transform[T](msg, schemaStr)) // Transform it Avro object.





import org.apache.avro.Schema
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.specific.SpecificData

import scala.reflect.runtime.universe._

object Utils {

  def convert[T <: GenericRecord: TypeTag](targetSchema: Schema)(record: AnyRef): T = {
      SpecificData.get.deepCopy(targetSchema, record).asInstanceOf[T]

Thorazine answered 8/6, 2017 at 9:8 Comment(9)
Can you please give full example or share the code in githubPtyalism
How does Utils.convert method define?Mendymene
What is a TypeTag? What if you don't want to hard-code a reader schema string and what to use the Schema ID within the message?Bushwa
@CaselChen, I have added the utility function, I hope that it will help you. Sorry but it's an old code & i didn't kept the repository.Thorazine
@cricket_007, please read about schema registry & Schema evolution. The schema is not hard coded.…Thorazine
I know about Schema Evolution. That wasn't my comment. I also know about the schema registry. My question was about your parameter schemaStr... This should not be necessary, as the KafkaAvroDeserializer class is able to detect the Magic Byte + Schema ID that Confluent's Avro encoding is using, then does a lookup against the registry for that schema string. In other words, I think it should be possible to not "hard code" a reader schema string, and let it be dynamically extracted from the registryBushwa
@cricket_007, you are right. the schemaStr is used for transformation.Thorazine
I guess my question is why you need it, or where in this example you'd define it? I can understand if you used the client to get the latest schema that's backwards compatible, but then you already have a schema object. No need to parse a stringBushwa
Let's assume you have a component, which produce an object with schema s1 and some of the consumers need different view on that data (e.g. some may use s1, and others may use s2, s3 & s4).You may use the schema evolution as a mechanism for that. Of course, it's not always a good idea.Thorazine

© 2022 - 2024 — McMap. All rights reserved.