Kafka : ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class
Asked Answered
D

3

13

I have this exception in the consumer when trying to cast the record.value() into java object :

ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast  to class [...].PublicActivityRecord (org.apache.avro.generic.GenericData$Record and [...].PublicActivityRecord are in unnamed module of loader 'app')

The producer sends the java object, which is a user defined type named PublicActivityRecord, like this :

KafkaProducer<String, PublicActivityRecord> producer = new KafkaProducer<>(createKafkaProperties());

[...]

    this.producer.send(new ProducerRecord<String, PublicActivityRecord>(myTopic, activityRecord));
    this.producer.flush();

At this point I can see in debug mode that the value of the ProducerRecord is indeed of type PublicActivityRecord.

On the registry server I can see in the log the POST request of the producer sending the schema :

Registering new schema: subject DEV-INF_9325_activityRecord_01-value, version null, id null, type null, schema size 7294 (io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource:262)
[2022-01-28 07:01:35,575] INFO 192.168.36.30 - - [28/janv./2022:06:01:34 +0000] "POST /subjects/DEV-INF_9325_activityRecord_01-value/versions HTTP/1.1" 200 8 "-" "Java/11.0.2" POSTsT (io.confluent.rest-utils.requests:62)

On the consumer side :

protected KafkaConsumer<String, PublicActivityRecord> consumer;

[...]

consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Stream.of(kafkaConfig.getTopicActivityRecord()).collect(Collectors.toList()));
final ConsumerRecords<String, PublicActivityRecord> records = consumer.poll(duration);
records.forEach(record -> {

    [...]

    PublicActivityRecord activityRecord = record.value();

Here the ClassCastException occurs.

In debug mode, I can see that the record.value is indeed of type GenericData$Record. And it can not be cast to PublicActivityRecord.

The serializer/deserilizer keys and values are the same :

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer

And in the schema-registry log, I can see the GET request of the consumer :

"GET /schemas/ids/3?fetchMaxId=false HTTP/1.1" 200 8447 "-" "Java/11.0.7" GETsT (io.confluent.rest-utils.requests:62)

So I have checked that :

  1. the producer sends a message with my own type PublicActivityRecord
  2. the message is received in the kafka broker
  3. the producer posts the schema to the schema registry
  4. the message is taken by the consumer
  5. the schema is GET by the consumer from the schema registry
  6. the value of the message is of the unexpected GenericData$Record

This leads me to the result that what is wrong is in my consumer.

So the question is : Why do the consumer get a GenericData record instead of the expected PublicActivityRecord ?

Any clue would be much appreciated !

Dominickdominie answered 30/1, 2022 at 21:38 Comment(0)
A
25

By default, only generic records are returned. You'll need to set

value.deserializer.specific.avro.reader=true 

Or, use the constant in your consumer configs

KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG = true

Alpenglow answered 30/1, 2022 at 23:34 Comment(2)
thanks for the hint, it helped me significantly. You have to set the mentioned config in your Spring Boot application as follows: spring: kafka: producer: value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer key-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer properties: specific: avro: reader: trueStraus
@Dany Consumer deserializer properties shouldn't be broken apart. specific.avro.reader: true should be on one lineAlpenglow
U
0

We got the same error by deserializing Kafka records with AVRO in Apache Flink. The cause was that we had renamed the namespaces of the POJO objects generated by the AVRO tools. So the namespace of the schema and the POJO object differed.

Ulcer answered 11/7, 2023 at 11:48 Comment(0)
V
0

I have added this configuration, and it's working for me.

.consumerProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true)

Example:

@Bean
public ReceiverOptions<String, OrderRequestModel> receiverOptions() {
    Map<String, Object> propetiesMap = new HashMap<>();
    propetiesMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    propetiesMap.put("schema.registry.url", "http://localhost:8081");
    propetiesMap.put(ConsumerConfig.GROUP_ID_CONFIG, group);
    propetiesMap.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "1");
    propetiesMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    propetiesMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propetiesMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
    return ReceiverOptions.<String, OrderRequestModel>create(propetiesMap)
            .consumerProperty(JsonDeserializer.VALUE_DEFAULT_TYPE, OrderRequestModel.class)
            .consumerProperty(JsonDeserializer.USE_TYPE_INFO_HEADERS, false)
            .consumerProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true)
            .subscription(List.of(topic));
}
Vocable answered 22/8 at 6:52 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.