Trouble with deserializing Avro data in Scala
Asked Answered
B

1

0

I am building an Apache Flink application in Scala which reads streaming data from a Kafka bus and then performs summarizing operations on it. The data from Kafka is in Avro format and needs a special Deserialization class. I found this scala class AvroDeserializationScehema (http://codegists.com/snippet/scala/avrodeserializationschemascala_saveveltri_scala):

package org.myorg.quickstart
import org.apache.avro.io.BinaryDecoder
import org.apache.avro.io.DatumReader
import org.apache.avro.io.DecoderFactory
import org.apache.avro.reflect.ReflectDatumReader
import org.apache.avro.specific.{SpecificDatumReader, SpecificRecordBase}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.common.serialization._
import java.io.IOException

class AvroDeserializationSchema[T](val avroType: Class[T]) extends DeserializationSchema[T] {
  private var reader: DatumReader[T] = null
  private var decoder : BinaryDecoder = null

  def deserialize(message: Array[Byte]): T = {
    ensureInitialized()
    try {
      decoder = DecoderFactory.get.binaryDecoder(message, decoder)
      reader.read(null.asInstanceOf[T], decoder)
    }
    catch {
      case e: IOException => {
        throw new RuntimeException(e)
      }
    }
  }

  def isEndOfStream(nextElement: T): Boolean = false


  def getProducedType: TypeInformation[T] = TypeExtractor.getForClass(avroType)

  private def ensureInitialized() {
    if (reader == null) {
      if (classOf[SpecificRecordBase].isAssignableFrom(avroType)) {
        reader = new SpecificDatumReader[T](avroType)
      }
      else {
        reader = new ReflectDatumReader[T](avroType)
      }
    }
  }
}

In my streaming class i use this as follows:

val stream = env
        .addSource(new FlinkKafkaConsumer010[String]("test", new 
AvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))

where DeviceData is an Scala case class defined in the same project

/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
                    sw_version: String,
                    timestamp: String,
                    reading: Double
                   )

I get the following error on compiling the StreamingKafkaClient.scala class

Error:(24, 102) object java.lang.Class is not a value
        .addSource(new FlinkKafkaConsumer010[String]("test", new 
AvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))

Also tried

val stream = env
        .addSource(new FlinkKafkaConsumer010[String]("test", new 
AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))

With that i get a different error:

Error:(21, 20) overloaded method constructor FlinkKafkaConsumer010 with alternatives:
  (x$1: java.util.regex.Pattern,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
  (x$1: java.util.regex.Pattern,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
  (x$1: java.util.List[String],x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
  (x$1: java.util.List[String],x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
  (x$1: String,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
  (x$1: String,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String]
 cannot be applied to (String, org.myorg.quickstart.AvroDeserializationSchema[org.myorg.quickstart.DeviceData], java.util.Properties)
        .addSource(new FlinkKafkaConsumer010[String]("test", new AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))

I am completely new at Scala (this is my first scala program) so i know i am missing something fundamental here. As i try to learn Scala could someone please point out what am i doing wrong. My intent is to basically read avro encoded data from Kafka into Flink and do some operations on streaming data. I could not find any examples of the usage of AvroDeserializationSchema class, seems to me this is something that should be natively built into Flink packages.

Bean answered 1/7, 2018 at 17:38 Comment(4)
By chance, does your Avro data come from the Confluent Schema Registry, because this won't work exactly the way as written anyway. See https://mcmap.net/q/2036676/-apache-flink-read-avro-byte-from-kafka Plus, if you know Java and not Scala, it's best to use Java 8Lithia
Thanks, but i want to use Scala only.Bean
Did you see that link? It uses Scala. And you didn't answer my first questionLithia
I will review the link again. No i am not using Confluent Registry Schema.Bean
L
0

In order to get a class object in Scala, you want classOf[DeviceData], not Class[DeviceData]

new AvroDeserializationSchema[DeviceData](classOf[DeviceData])

I could not find any examples of the usage of AvroDeserializationSchema class

I found one (in Java)

Also, it looks like in Flink 1.6 release, they will add this class rather than you copying from elsewhere. FLINK-9337 & FLINK-9338

As mentioned in the comments, if you would like to use the Confluent Avro Schema Registry rather than giving a class type, see this answer, or refer to the code in the above Github link

Additionally, if you are running Kafka 0.11+ (or Confluent 3.3+), then you should ideally be using FlinkKafkaConsumer011 along with the class you are deserializing to

new FlinkKafkaConsumer011[DeviceData]
Lithia answered 2/7, 2018 at 1:53 Comment(4)
Edited my original question to include compiler error i got with "classOf[DeviceData]". Just saw the Java example (github.com/okkam-it/flink-examples/blob/master/src/main/java/…) EXACTLY what i am trying to do in Scala.Bean
You're still using [String] in your consumer type, not the type you're deserializingLithia
That was it, how did i miss that. Fixed that and it worked perfectly, thank you! I do have a next problem though, as the streamin data is received, the program crashed immediately because seems like the deserialization routine requires an "init" method: Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException: java.lang.NoSuchMethodException: org.myorg.quickstart.DeviceData.<init>()Bean
Accepting this answer! To summarize i used FlinkKafkaConsumer011 (although i dont think that was needed for this problem) and the code that worked was : val stream = env .addSource(new FlinkKafkaConsumer011[DeviceData]("test", new AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties)) I will create a new issue for the init method related error.Bean

© 2022 - 2024 — McMap. All rights reserved.