KafkaAvroDeserializer does not return SpecificRecord but returns GenericRecord
Asked Answered
M

2

15

My KafkaProducer is able to use KafkaAvroSerializer to serialize objects to my topic. However, KafkaConsumer.poll() returns deserialized GenericRecord instead of my serialized class.

MyKafkaProducer

 KafkaProducer<CharSequence, MyBean> producer;
    try (InputStream props = Resources.getResource("producer.props").openStream()) {
      Properties properties = new Properties();
      properties.load(props);
      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
      properties.put("schema.registry.url", "http://localhost:8081");

      MyBean bean = new MyBean();
      producer = new KafkaProducer<>(properties);
      producer.send(new ProducerRecord<>(topic, bean.getId(), bean));

My KafkaConsumer

 try (InputStream props = Resources.getResource("consumer.props").openStream()) {
      properties.load(props);
      properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
      properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
      properties.put("schema.registry.url", "http://localhost:8081");
      consumer = new KafkaConsumer<>(properties);
    }
    consumer.subscribe(Arrays.asList(topic));
    try {
      while (true) {
        ConsumerRecords<CharSequence, MyBean> records = consumer.poll(100);
        if (records.isEmpty()) {
          continue;
        }
        for (ConsumerRecord<CharSequence, MyBean> record : records) {
          MyBean bean = record.value(); // <-------- This is throwing a cast Exception because it cannot cast GenericRecord to MyBean
          System.out.println("consumer received: " + bean);
        }
      }

MyBean bean = record.value(); That line throws a cast Exception because it cannot cast GenericRecord to MyBean.

I'm using kafka-client-0.9.0.1, kafka-avro-serializer-3.0.0.

Mickiemickle answered 21/9, 2016 at 1:9 Comment(0)
K
18

KafkaAvroDeserializer supports SpecificData

It's not enabled by default. To enable it:

properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

KafkaAvroDeserializer does not support ReflectData

Confluent's KafkaAvroDeserializer does not know how to deserialize using Avro ReflectData. I had to extend it to support Avro ReflectData:

/**
 * Extends deserializer to support ReflectData.
 *
 * @param <V>
 *     value type
 */
public abstract class ReflectKafkaAvroDeserializer<V> extends KafkaAvroDeserializer {

  private Schema readerSchema;
  private DecoderFactory decoderFactory = DecoderFactory.get();

  protected ReflectKafkaAvroDeserializer(Class<V> type) {
    readerSchema = ReflectData.get().getSchema(type);
  }

  @Override
  protected Object deserialize(
      boolean includeSchemaAndVersion,
      String topic,
      Boolean isKey,
      byte[] payload,
      Schema readerSchemaIgnored) throws SerializationException {

    if (payload == null) {
      return null;
    }

    int schemaId = -1;
    try {
      ByteBuffer buffer = ByteBuffer.wrap(payload);
      if (buffer.get() != MAGIC_BYTE) {
        throw new SerializationException("Unknown magic byte!");
      }

      schemaId = buffer.getInt();
      Schema writerSchema = schemaRegistry.getByID(schemaId);

      int start = buffer.position() + buffer.arrayOffset();
      int length = buffer.limit() - 1 - idSize;
      DatumReader<Object> reader = new ReflectDatumReader(writerSchema, readerSchema);
      BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
      return reader.read(null, decoder);
    } catch (IOException e) {
      throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
    } catch (RestClientException e) {
      throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
    }
  }
}

Define a custom deserializer class which deserializes to MyBean:

public class MyBeanDeserializer extends ReflectKafkaAvroDeserializer<MyBean> {
  public MyBeanDeserializer() {
    super(MyBean.class);
  }
}

Configure KafkaConsumer to use the custom deserializer class:

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyBeanDeserializer.class);
Kinesthesia answered 21/9, 2016 at 12:52 Comment(6)
Thanks! I thought the generated bean extends SpecificRecordBase and implements SpecificRecord so what does it have to do with Avro Reflect Data? I'm new to Avro so just want a better understanding.Mickiemickle
I tried your code and got the following exception: Caused by: org.apache.avro.AvroTypeException: Found string, expecting com.MyBean at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292) at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:130)at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:223) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)at org.apache.avro.generic.GenericDatumReader.read(GeneMickiemickle
I kept everything the same except switched to the new Deserializer that you've provided.Mickiemickle
I didn't know you're using SpecificData. I updated my answer.Kinesthesia
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true) worked for me. Btw, is there a documentation somewhere? I actually have been doing trial and error to figure things out.Mickiemickle
ReflectData support got merged. github.com/confluentinc/schema-registry/pull/1111Inamorato
E
1

Edit : reflect data support got merged (see below)

To add to Chin Huang's answer, for minimal code and better performance, you should probably implement it this way :

/**
 * Extends deserializer to support ReflectData.
 *
 * @param <V>
 *     value type
 */
public abstract class SpecificKafkaAvroDeserializer<V extends SpecificRecordBase> extends AbstractKafkaAvroDeserializer implements Deserializer<V> {
  private final Schema schema;
  private Class<T> type;
  private DecoderFactory decoderFactory = DecoderFactory.get();

  protected SpecificKafkaAvroDeserializer(Class<T> type, Map<String, ?> props) {
    this.type = type;
    this.schema = ReflectData.get().getSchema(type);
    this.configure(this.deserializerConfig(props));
  }

  public void configure(Map<String, ?> configs) {
    this.configure(new KafkaAvroDeserializerConfig(configs));
  }

  @Override
  protected T deserialize(
          boolean includeSchemaAndVersion,
          String topic,
          Boolean isKey,
          byte[] payload,
          Schema readerSchemaIgnore) throws SerializationException {

    if (payload == null) {
      return null;
    }

    int schemaId = -1;
    try {
      ByteBuffer buffer = ByteBuffer.wrap(payload);
      if (buffer.get() != MAGIC_BYTE) {
        throw new SerializationException("Unknown magic byte!");
      }

      schemaId = buffer.getInt();
      Schema schema = schemaRegistry.getByID(schemaId);

      Schema readerSchema = ReflectData.get().getSchema(type);

      int start = buffer.position() + buffer.arrayOffset();
      int length = buffer.limit() - 1 - idSize;
      SpecificDatumReader<T> reader = new SpecificDatumReader(schema, readerSchema);
      BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
      return reader.read(null, decoder);
    } catch (IOException e) {
      throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
    } catch (RestClientException e) {
      throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
    }
  }
}
Evslin answered 28/2, 2017 at 16:47 Comment(2)
I feel like V needs to be renamed to TInamorato
ReflectData support got merged github.com/confluentinc/schema-registry/pull/1111Inamorato

© 2022 - 2024 — McMap. All rights reserved.