If you use Confluent's schema registry, I believe a preferred solution would be to use the Avro serde provided by Confluent. This way, we just call deserialize()
and the resolution of the latest version of the Avro schema to use is done automatically behind the scene and no byte manipulation is required.
It boils down to something like this (example code in scala, a java solution would be very similar):
import io.confluent.kafka.serializers.KafkaAvroDeserializer
...
val valueDeserializer = new KafkaAvroDeserializer()
valueDeserializer.configure(
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
false)
...
override def deserialize(messageKey: Array[Byte], message: Array[Byte],
topic: String, partition: Int, offset: Long): KafkaKV = {
val key = keyDeserializer.deserialize(topic, messageKey).asInstanceOf[GenericRecord]
val value = valueDeserializer.deserialize(topic, message).asInstanceOf[GenericRecord]
KafkaKV(key, value)
}
...
This method requires that the message producer is also integrated with the schema registry and publishes the schema there. This can be done in a very similar way as above, using Confluent's KafkaAvroSerializer
I posted a detailed explanation here: How to integrate Flink with Confluent's schema registry