Reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)
Asked Answered
V

4

9

I have a spark 2.0 application that reads messages from kafka using spark streaming (with spark-streaming-kafka-0-10_2.11).

Structured streaming looks really cool so I wanted to try and migrate the code but I can't figure out how to use it.

in the regular streaming I used kafkaUtils to createDstrean and in the parameters I passed it was the value deserializer.

in the Structured streaming the doc says that I should deserialize using DataFrame functions but I can't figure exactly what that means.

I looked at examples such as this example but my Avro object in Kafka is quit complex and cannot be simply casted like the String in the example..

So far I tried this kind of code (which I saw here in a different question):

import spark.implicits._

  val ds1 = spark.readStream.format("kafka").
    option("kafka.bootstrap.servers","localhost:9092").
    option("subscribe","RED-test-tal4").load()

  ds1.printSchema()
  ds1.select("value").printSchema()
  val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()  
  val query = ds2.writeStream
    .outputMode("append")
    .format("console")
    .start()

and I get "data type mismatch: cannot cast BinaryType to StructType(StructField(...."

how can I deserialize the value?

Vltava answered 20/11, 2016 at 15:40 Comment(2)
Did someone find a working solution? None of the below are working for me!Tigges
This library supports structured streams with Avro as a payload and may help: ABRiS (Avro Bridge for Spark). It still is under development but supports your use case. DISCLOSURE: I work for ABSA and I am the main developer behind this library.School
S
4

As noted above, as of Spark 2.1.0 there is support for avro with the batch reader but not with SparkSession.readStream(). Here is how I got it to work in Scala based on the other responses. I've simplified the schema for brevity.

package com.sevone.sparkscala.mypackage

import org.apache.spark.sql._
import org.apache.avro.io.DecoderFactory
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}

object MyMain {

    // Create avro schema and reader
    case class KafkaMessage (
        deviceId: Int,
        deviceName: String
    )
    val schemaString = """{
        "fields": [
            { "name":  "deviceId",      "type": "int"},
            { "name":  "deviceName",    "type": "string"},
        ],
        "name": "kafkamsg",
        "type": "record"
    }"""
    val messageSchema = new Schema.Parser().parse(schemaString)
    val reader = new GenericDatumReader[GenericRecord](messageSchema)
    // Factory to deserialize binary avro data
    val avroDecoderFactory = DecoderFactory.get()
    // Register implicit encoder for map operation
    implicit val encoder: Encoder[GenericRecord] = org.apache.spark.sql.Encoders.kryo[GenericRecord]

    def main(args: Array[String]) {

        val KafkaBroker =  args(0);
        val InTopic = args(1);
        val OutTopic = args(2);

        // Get Spark session
        val session = SparkSession
                .builder
                .master("local[*]")
                .appName("myapp")
                .getOrCreate()

        // Load streaming data
        import session.implicits._
        val data = session
                .readStream
                .format("kafka")
                .option("kafka.bootstrap.servers", KafkaBroker)
                .option("subscribe", InTopic)
                .load()
                .select($"value".as[Array[Byte]])
                .map(d => {
                    val rec = reader.read(null, avroDecoderFactory.binaryDecoder(d, null))
                    val deviceId = rec.get("deviceId").asInstanceOf[Int]
                    val deviceName = rec.get("deviceName").asInstanceOf[org.apache.avro.util.Utf8].toString
                    new KafkaMessage(deviceId, deviceName)
                })
Stereograph answered 10/5, 2017 at 8:55 Comment(3)
it didnt worked for me ,Caused by: java.io.EOFException errorPtyalism
This solution doesn't worked for schema registry enabled kafka. It reported "Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -13"Mendymene
Make sure the avro schema used here matches word to word with the schema from kafka.Experimentalize
G
3

I'm not yet super familiar how Spark's serialization works in combination with the new/experimental Structured Streaming, but the approach below does work -- though I am not sure if it's the best way (IMHO the approach has a somewhat awkward look 'n feel).

I'll try to answer your question at the example of a custom data type (here: a Foo case class) instead of specifically Avro, but I hope it'll help you anyways. The idea is to use Kryo serialization to serialize/deserialize your custom type, see Tuning: Data serialization in the Spark documentation.

Note: Spark supports serialization of case classes out of the box via built-in (implicit) encoders that you can import via import spark.implicits._. But let's ignore this functionality for the sake of this example.

Imagine you have defined the following Foo case class as your custom type (TL;DR hint: to prevent running into weird Spark serialization complaints/errors you should put the code into a separate Foo.scala file):

// This could also be your auto-generated Avro class/type
case class Foo(s: String)

Now you have the following Structured Streaming code to read data from Kafka, where the input topic contains Kafka messages whose message value is a binary-encoded String, and your goal is it to create Foo instances based on these message values (i.e. similar to how you'd deserialize binary data into instances of an Avro class):

val messages: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
    .option("subscribe", "my-input-topic")
    .load()

Now we are deserializing the values into instances of our custom Foo type, for which we first need to define an implicit Encoder[Foo]:

implicit val myFooEncoder: Encoder[Foo] = org.apache.spark.sql.Encoders.kryo[Foo]
val foos: Dataset[Foo] = messages.map(row => Foo(new String(row.getAs[Array[Byte]]("value")))

Going back to your Avro question, what you'd need to do is to:

  1. Create a proper Encoder for your needs.
  2. Replace Foo(new String(row.getAs[Array[Byte]]("value")) with the code to deserialize your binary-encoded Avro data into Avro POJOs, i.e. code that takes your binary-encoded Avro data out of the message value (row.getAs[Array[Byte]]("value")) and returns, say, an Avro GenericRecord or whatever SpecificCustomAvroObject you have defined elsewhere.

If someone else knows of a more concise/better/... way to answer Tal's question, I'm all ears. :-)

See also:

Gumma answered 21/11, 2016 at 10:23 Comment(4)
I think Tal's use case is that he does not have binary encoded String on his topic, he has binary encoded avro. Would using bijection-avro work in this case?Varus
yep @Varus that's right.. I had to backlog this project a little so I havn't had a chance to try anything new.. hopefully I'll look into this subject soon. when I'll do I'll look into bijection-avroVltava
@TalJoffe please let me know what you come up with. I'm trying to read avro that was put there by a kstream and bijection-avro didn't work out for me.Varus
@Varus oh o.k. as I said it might take me a while but once I figure it out I'll right hereVltava
V
3

So actually someone in my firm solved this for me So I'll post it here for future readers..

basically what I missed on top of what miguno suggested is the decode part:

def decodeMessages(iter: Iterator[KafkaMessage], schemaRegistryUrl: String) : Iterator[<YourObject>] = {
val decoder = AvroTo<YourObject>Decoder.getDecoder(schemaRegistryUrl)
iter.map(message => {
  val record = decoder.fromBytes(message.value).asInstanceOf[GenericData.Record]
  val field1 = record.get("field1Name").asInstanceOf[GenericData.Record]
  val field2 = record.get("field1Name").asInstanceOf[GenericData.String]
        ...
  //create an object with the fields extracted from genericRecord
  })
}

now you can read messages from kafka and decode them like so:

val ds = spark
  .readStream
  .format(config.getString(ConfigUtil.inputFormat))
  .option("kafka.bootstrap.servers", config.getString(ConfigUtil.kafkaBootstrapServers))
  .option("subscribe", config.getString(ConfigUtil.subscribeTopic))
  .load()
  .as[KafkaMessage]

val decodedDs  = ds.mapPartitions(decodeMessages(_, schemaRegistryUrl))

*KafkaMessage is simply a case class that contains the generic object you get when reading from Kafka (key,value,topic,partition,offset,timestamp)

AvroTo<YourObject>Decoder is some class that'll decode your object given a schema registry url.

For example using Confluent's KafkaAvroDeserializer and schema registry.

val kafkaProps = Map("schema.registry.url" -> schemaRegistryUrl)
val client = new CachedSchemaRegistryClient(schemaRegistryUrl, 20)

// If you have Avro encoded keys
val keyDeserializer = new KafkaAvroDeserializer(client)
keyDeserializer.configure(kafkaProps.asJava, true) //isKey = true

// Avro encoded values
valueDeserializer = new KafkaAvroDeserializer(client)
valueDeserializer.configure(kafkaProps.asJava, false) //isKey = false

From these, call .deserialize(topicName, bytes).asInstanceOf[GenericRecord] to get an avro object.

Hope this helps someone

Vltava answered 14/12, 2016 at 15:53 Comment(1)
So do you mean we need provide related case class besides avro generated class? Could you show us your import statement? Where to get the "Deocder" class in this statement? val decoder = AvroTo<YourObject>Decoder.getDecoder(schemaRegistryUrl)Mendymene
T
2

Use the following steps:

  • Define a Kafka message.
  • Define a consumer Utility, which returns a DataSet of YourAvroObject.
  • Define your logical code.

Kafka Message:

case class KafkaMessage(key: String, value: Array[Byte],
                                    topic: String, partition: String, offset: Long, timestamp: Timestamp)

Kafka Consumer:

import java.util.Collections

import com.typesafe.config.{Config, ConfigFactory}
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.SparkSession

import scala.reflect.runtime.universe._


object KafkaAvroConsumer {

  private val conf: Config = ConfigFactory.load().getConfig("kafka.consumer")
  val valueDeserializer = new KafkaAvroDeserializer()
  valueDeserializer.configure(Collections.singletonMap("schema.registry.url",
    conf.getString("schema.registry.url")), false)

  def transform[T <: GenericRecord : TypeTag](msg: KafkaMessage, schemaStr: String) = {
    val schema = new Schema.Parser().parse(schemaStr)
    Utils.convert[T](schema)(valueDeserializer.deserialize(msg.topic, msg.value))
  }

  def createDataStream[T <: GenericRecord with Product with Serializable : TypeTag]
  (schemaStr: String)
  (subscribeType: String, topics: String, appName: String, startingOffsets: String = "latest") = {

    val spark = SparkSession
      .builder
      .master("local[*]")
      .appName(appName)
      .getOrCreate()

    import spark.implicits._

    // Create DataSet representing the stream of KafkaMessage from kafka
    val ds = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
      .option(subscribeType, topics)
      .option("startingOffsets", "earliest")
      .load()
      .as[KafkaMessage]
      .map(msg => KafkaAvroConsumer.transform[T](msg, schemaStr)) // Transform it Avro object.

    ds
  }

}

Update

Utils:

import org.apache.avro.Schema
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.specific.SpecificData

import scala.reflect.runtime.universe._

object Utils {


  def convert[T <: GenericRecord: TypeTag](targetSchema: Schema)(record: AnyRef): T = {
      SpecificData.get.deepCopy(targetSchema, record).asInstanceOf[T]
  }


}
Thorazine answered 8/6, 2017 at 9:8 Comment(9)
Can you please give full example or share the code in githubPtyalism
How does Utils.convert method define?Mendymene
What is a TypeTag? What if you don't want to hard-code a reader schema string and what to use the Schema ID within the message?Bushwa
@CaselChen, I have added the utility function, I hope that it will help you. Sorry but it's an old code & i didn't kept the repository.Thorazine
@cricket_007, please read about schema registry & Schema evolution. The schema is not hard coded. docs.oracle.com/database/nosql-11.2.2.0/GettingStartedGuide/…Thorazine
I know about Schema Evolution. That wasn't my comment. I also know about the schema registry. My question was about your parameter schemaStr... This should not be necessary, as the KafkaAvroDeserializer class is able to detect the Magic Byte + Schema ID that Confluent's Avro encoding is using, then does a lookup against the registry for that schema string. In other words, I think it should be possible to not "hard code" a reader schema string, and let it be dynamically extracted from the registryBushwa
@cricket_007, you are right. the schemaStr is used for transformation.Thorazine
I guess my question is why you need it, or where in this example you'd define it? I can understand if you used the client to get the latest schema that's backwards compatible, but then you already have a schema object. No need to parse a stringBushwa
Let's assume you have a component, which produce an object with schema s1 and some of the consumers need different view on that data (e.g. some may use s1, and others may use s2, s3 & s4).You may use the schema evolution as a mechanism for that. Of course, it's not always a good idea.Thorazine

© 2022 - 2024 — McMap. All rights reserved.