Error reading field 'topics': java.nio.BufferUnderflowException in Kafka
Asked Answered
L

2

5

9.0 client to consume messages from two brokers which are running on a remote system.My producer is working fine and is able to send messages to the broker but my consumer is not able to consume these messages.Consumer and producer are running on my local system and the two brokers are on aws. Whenever I try to run consumer. Following error appears on the broker logs.

ERROR Closing socket for /122.172.17.81 because of error (kafka.network.Processor)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException
        at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66)
        at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85)
        at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29)
        at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
        at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
        at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50)
        at kafka.network.Processor.read(SocketServer.scala:450)
        at kafka.network.Processor.run(SocketServer.scala:340)
        at java.lang.Thread.run(Thread.java:745)

My Consumer code is as follows>

package Kafka1.K1;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

public class HelloKafkaConsumer 
{
    public static void main(String args[]) throws InterruptedException, ExecutionException {
        String a[] = new String[]{"loader1"};
        //topik.add("loader1");
Properties props = new Properties();
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP1:9092,IP2:9093");
 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
 props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
 props.put("heartbeat.interval.ms", "500");
 props.put("session.timeout.ms", "1000");
 props.put("enable.auto.commit", "true");
 props.put("auto.commit.interval.ms", "10000");
 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
 consumer.subscribe(Arrays.asList(a));
 while (true) {
        // Poll for ConsumerRecords for a certain amount of time
        ConsumerRecords<String, String> records = consumer.poll(1000);

        // Process the ConsumerRecords, if any, that came back
        for (ConsumerRecord<String, String> record : records) {
                String key = record.key();
                String value = record.value();
                System.out.println(key+":"+value);
                // Do something with message
        }
      }

    }
}

Can someone help?

Latrice answered 1/6, 2016 at 9:34 Comment(2)
Which version of Kafka Broker are you using?Lam
My guess is one of your broker version is 0.8.x. Check KAFKA-2496Lam
S
7

This issue occurs when the kafka cluster running on your machine is older version i.e 0.8.x.x where as the client being used to access data from the cluster is of higher version i.e 0.9.x.x.

There are two simple solutions based on requirements:

  1. Downgrade the client version.
  2. Upgrade the kafka cluster.
Sarasvati answered 26/7, 2016 at 5:59 Comment(0)
C
0

Please see the kafka clients which you are using to produce and kafka servers version This is purely kafka clients version incompatability

Ceballos answered 19/1, 2023 at 14:5 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.