Apache Kafka and Avro: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer
Asked Answered
D

3

7

Whenever I am trying to read the message from kafka queue, I am getting following exception :

[error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer
        at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:79)
        at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:87)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)

Kafka Producer Code:

public class AvroSpecificProducer {
    private static Properties kafkaProps = new Properties();
    private static KafkaProducer<String, Customer> kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    public static void fireAndForget(ProducerRecord<String, Customer> record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord<String, Customer> record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: "+ recordMetaData.offset());
            System.out.println("Topic: "+ recordMetaData.topic());
            System.out.println("Partition: "+ recordMetaData.partition());
            System.out.println("Timestamp: "+ recordMetaData.timestamp());
        });
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        Customer customer1 = new Customer(1002, "Jimmy");
        ProducerRecord<String, Customer> record1 = new ProducerRecord<>("CustomerSpecificCountry",
                "Customer One 11 ", customer1
        );

        asyncSend(record1);

        Thread.sleep(1000);
    }
}

Kafka Consumer Code:

public class AvroSpecificDeserializer {

    private static Properties kafkaProps = new Properties();

    static {
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1");
        kafkaProps.put("zookeeper.connect", "localhost:2181");
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
    }

    public static void infiniteConsumer() throws IOException {
        VerifiableProperties properties = new VerifiableProperties(kafkaProps);
        KafkaAvroDecoder keyDecoder = new KafkaAvroDecoder(properties);
        KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(properties);

        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put("NewTopic", 1);

        ConsumerConnector consumer = createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(kafkaProps));
        Map<String, List<KafkaStream<Object, Object>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);

        KafkaStream stream = consumerMap.get("NewTopic").get(0);
        ConsumerIterator it = stream.iterator();

        System.out.println("???????????????????????????????????????????????? ");
        while (it.hasNext()) {
            System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ");
            MessageAndMetadata messageAndMetadata = it.next();
            String key = (String) messageAndMetadata.key();
            GenericRecord record = (GenericRecord) messageAndMetadata.message();
            Customer customer = (Customer) SpecificData.get().deepCopy(Customer.SCHEMA$, record);
            System.out.println("Key: " + key);
            System.out.println("Value: " + customer);
        }

    }

    public static void main(String[] args) throws IOException {
        infiniteConsumer();
    }
}

I am following, These examples:

  1. https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-producer/src/main/java/io/confluent/examples/producer/AvroClicksProducer.java
  2. https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-consumer/src/main/java/io/confluent/examples/consumer/AvroClicksSessionizer.java
Dewyeyed answered 13/2, 2017 at 9:38 Comment(0)
D
20

This is the final code that would work, after discussing with @harmeen

static { 
    kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest"); 
    kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1"); 
    kafkaProps.put("zookeeper.connect", "localhost:2181"); 
    kafkaProps.put("schema.registry.url", "http://localhost:8081"); 
    kafkaProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 
}

public static void infiniteConsumer() throws IOException { 

VerifiableProperties properties = new VerifiableProperties(kafkaProps); 
StringDecoder keyDecoder = new StringDecoder(properties); 
KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(properties); 

Map<String, Integer> topicCountMap = new HashMap<>(); 
topicCountMap.put("BrandNewTopics", 1); 

ConsumerConnector consumer = createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(kafkaProps)); 
Map<String, List<KafkaStream<String, Object>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder); 

KafkaStream stream = consumerMap.get("BrandNewTopics").get(0); 
ConsumerIterator it = stream.iterator(); 

while (it.hasNext()) { 
    MessageAndMetadata messageAndMetadata = it.next(); 
    String key = (String) messageAndMetadata.key(); 
    GenericRecord record = (GenericRecord) messageAndMetadata.message(); 
    Customer customer = (Customer) SpecificData.get().deepCopy(Customer.SCHEMA$, record); 
    System.out.println("Key: " + key); 
    System.out.println("Value: " + customer); 
} 

Things that got change:

  • Adding SPECIFIC_AVRO_READER_CONFIG property to true.
  • Using smallest to start from the beginning of the topic.
  • Using StringSerializer and StringDeserializer for keys.
  • Change both producer and consumer to reflect the previous change
  • Adjust the namespace for the Customer class that represents the Avro record.
Donelson answered 13/2, 2017 at 9:52 Comment(8)
If that doesn't fix it, consider these steps: 1. Check that your schema registry contains a schema for the corresponding subject. 2. Use kafka-avro-console-consumer to consume your event. That would narrow the scope of the problem to either your producer or your consumer.Donelson
Hey @Javier, I ma using ./kafka-avro-console-consumer --bootstrap-server localhost:2181 --topic CustomerSpecificCountry --from-beginning --property schema.registry.url=http://localhost:8081 for executing consumer, but nothing is received bu this consumer, Is something wrong with my command?Dewyeyed
bootstrap-server should point to one or more of your kafka brokers, not your zookeeper node.Donelson
I got it, I am getting this exception ` ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:105) org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:703) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:584) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566) at kafka.consumer.NewShinyConsumer.<init>(BaseConsumer.scala:65) ` How can I resolve this?Dewyeyed
That's not very helpful. Are you passing more than one kafka broker? I'd consider passing just one, I can never remember what's the format to pass multiple, if separate them by commands or semicolons or what. You only need one because it just needs to reach one broker to get the metadata about the cluster.Donelson
No, I have only one kafka broker. How can we register schema into schema-registry?Dewyeyed
Let us continue this discussion in chat.Donelson
thanks. this is work for me: T data = (T) SpecificData.get().deepCopy(record.value().getSchema(), record.value()); recode is ConsumerRecord<String, GenericData.Record> recordImpression
F
1

I did get the issue. The root cause is Avro deserializer expect the class also should be in the same package as serializer. For example if in your producer you used a.b.c.AvroSerializer and a.b.c.AvroDeserializer in your consumer you should use the same package structure. Just keeping the same package structure resolved the issue for me.

Frasco answered 6/7, 2022 at 4:24 Comment(1)
This does not really answer the question. If you have a different question, you can ask it by clicking Ask Question. To get notified when this question gets new answers, you can follow this question. Once you have enough reputation, you can also add a bounty to draw more attention to this question. - From ReviewAlerion
A
1

Facing the same issue was resolved to put this two configurations within the consumer factory config:

config.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
config.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
Antimissile answered 6/1, 2023 at 18:19 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.