Kafka consumer error Cancelled in-flight API_VERSIONS request with correlation id 1 due to node -1 being disconnected
I have the consumer config as follows

package com.example.kafka.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

public class KafkaConsumerConfig {
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-tenent1-id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String>
                factory = new ConcurrentKafkaListenerContainerFactory<>();
        return factory;


However, when the application is started, I am seeing the following keeps on outputting

Cancelled in-flight API_VERSIONS request with correlation id 1 due to node -1 being disconnected

I was able to send message to a Kafka topic using the following though

kafkaTemplate.send("test-topic", msg);

The consumer listener is as follows

public class Receiver {
    @KafkaListener(topics = "test-topic", groupId = "group-tenent1-id")
    public void listen(String message) {
        log.info("Received Messasge in group - group-id: " + message);
package com.example.kafka.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

public class KafkaProducerConfig {
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());

But I am unable to the logging that a message was received

enter image description here

How do you know the producer worked? Did you actually check the broker directly? Did you actually check the SendResult response from the template method? Because port 2181 is Zookeeper, not Kafka

You've not shown your producer configuration, but I assume it uses localhost:9092 if it did work.

Your consumer is not using this.

Ideally, you should externalize your config into the Spring properties file and use one location to set spring.kafka.bootstrap-servers=localhost:9092, which will then be used by both the consumer and producer clients within the app.

Yes, it is on 9092. I added the producer config
I can confirm that from Offset Explorer the message was sent. See the image as attached

In my case i was using aws msk bootstrap servers with IAM. I haven't add the proper security config for security.protocol and sasl.mechanism and after changing it, my app worked without any issue.

Previously it was like below

  security.protocol: PLAINTEXT
  sasl.mechanism: PLAIN

And then I changed it to

  security.protocol: SASL_SSL
  sasl.mechanism: AWS_MSK_IAM
I was getting this error message for the consumer with Confluent Cloud Kafka but not with my local Kafka cluster. None of the above solutions applied. I suspected security settings but my application.yml included them all - and correctly. Ultimately found the fix was to put security settings for Confluent in the Kafka Consumer Config such as shown here (your security settings may vary):

kafkaConfigProperties.put("ssl.endpoint.identification.algorithm", "https"); 
kafkaConfigProperties.put("sasl.mechanism", "PLAIN"); 
kafkaConfigProperties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + apiKey + "\" password=\"" + apiPassword + "\";"); 
kafkaConfigProperties.put("security.protocol", "SASL_SSL");
Removed all custom Kafka config in Java and put them in application.yml has resolved the issue. I was able to send and receive messages now.

  port: 8080
      bootstrap-servers: localhost:9092
      group-id: group-tenant1-id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
You can move the bootstrap servers from individual clients to underneath spring.kafka directly

I had this problem with certain configuration of listeners:

   image: confluentinc/confluent-local:7.4.1
   hostname: broker
   container_name: broker
     - "29092:29092"
     - "9101:9101"
     KAFKA_LISTENERS: 'PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092,CONTROLLER://broker:29093'

The problem disappeared after changing the last line (only the last line!) to:


So basically I changed PLAINTEXT_HOST://localhost:29092 to PLAINTEXT_HOST://, although it's not clear for me why it didn't work. Both configs seem ok.

in spring kafka 3.0.9 the auto configured kafkaadmin does not read the security information. The producer uses admin client to add some metrics and it will error out with the same message. You need to create your own bean for KafkaAdmin.

