Kafka consumer error Cancelled in-flight API_VERSIONS request with correlation id 1 due to node -1 being disconnected
Asked Answered
S

6

12

I have the consumer config as follows

package com.example.kafka.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-tenent1-id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String>
                factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

However, when the application is started, I am seeing the following keeps on outputting

Cancelled in-flight API_VERSIONS request with correlation id 1 due to node -1 being disconnected

I was able to send message to a Kafka topic using the following though

kafkaTemplate.send("test-topic", msg);

The consumer listener is as follows

@Service
public class Receiver {
    @KafkaListener(topics = "test-topic", groupId = "group-tenent1-id")
    public void listen(String message) {
        log.info("Received Messasge in group - group-id: " + message);
    }
}
package com.example.kafka.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

But I am unable to the logging that a message was received

enter image description here

Sialkot answered 21/7, 2022 at 14:33 Comment(1)
How do you know the producer worked? Did you actually check the broker directly? Did you actually check the SendResult response from the template method? Because port 2181 is Zookeeper, not KafkaOldtimer
O
1

You've not shown your producer configuration, but I assume it uses localhost:9092 if it did work.

Your consumer is not using this.

Ideally, you should externalize your config into the Spring properties file and use one location to set spring.kafka.bootstrap-servers=localhost:9092, which will then be used by both the consumer and producer clients within the app.

Oldtimer answered 21/7, 2022 at 16:24 Comment(2)
Yes, it is on 9092. I added the producer configSialkot
@OneCricketer, I can confirm that from Offset Explorer the message was sent. See the image as attachedSialkot
S
4

In my case i was using aws msk bootstrap servers with IAM. I haven't add the proper security config for security.protocol and sasl.mechanism and after changing it, my app worked without any issue.

Previously it was like below

  security.protocol: PLAINTEXT
  sasl.mechanism: PLAIN

And then I changed it to

  security.protocol: SASL_SSL
  sasl.mechanism: AWS_MSK_IAM
Selfeducated answered 25/8, 2022 at 6:12 Comment(0)
P
4

I was getting this error message for the consumer with Confluent Cloud Kafka but not with my local Kafka cluster. None of the above solutions applied. I suspected security settings but my application.yml included them all - and correctly. Ultimately found the fix was to put security settings for Confluent in the Kafka Consumer Config such as shown here (your security settings may vary):

kafkaConfigProperties.put("ssl.endpoint.identification.algorithm", "https"); 
kafkaConfigProperties.put("sasl.mechanism", "PLAIN"); 
kafkaConfigProperties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + apiKey + "\" password=\"" + apiPassword + "\";"); 
kafkaConfigProperties.put("security.protocol", "SASL_SSL");
Punnet answered 16/6, 2023 at 21:31 Comment(0)
S
2

Removed all custom Kafka config in Java and put them in application.yml has resolved the issue. I was able to send and receive messages now.

server:
  port: 8080
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: group-tenant1-id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
Sialkot answered 21/7, 2022 at 18:26 Comment(1)
You can move the bootstrap servers from individual clients to underneath spring.kafka directlyOldtimer
O
1

You've not shown your producer configuration, but I assume it uses localhost:9092 if it did work.

Your consumer is not using this.

Ideally, you should externalize your config into the Spring properties file and use one location to set spring.kafka.bootstrap-servers=localhost:9092, which will then be used by both the consumer and producer clients within the app.

Oldtimer answered 21/7, 2022 at 16:24 Comment(2)
Yes, it is on 9092. I added the producer configSialkot
@OneCricketer, I can confirm that from Offset Explorer the message was sent. See the image as attachedSialkot
T
0

I had this problem with certain configuration of listeners:

broker:
   image: confluentinc/confluent-local:7.4.1
   hostname: broker
   container_name: broker
   ports:
     - "29092:29092"
     - "9101:9101"
   environment:
     KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092'
     KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
     KAFKA_LISTENERS: 'PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092,CONTROLLER://broker:29093'

The problem disappeared after changing the last line (only the last line!) to:

  KAFKA_LISTENERS: 'PLAINTEXT://broker:9092,PLAINTEXT_HOST://0.0.0.0:29092,CONTROLLER://broker:29093'

So basically I changed PLAINTEXT_HOST://localhost:29092 to PLAINTEXT_HOST://0.0.0.0:29092, although it's not clear for me why it didn't work. Both configs seem ok.

Traceetracer answered 21/8, 2023 at 8:40 Comment(0)
M
0

in spring kafka 3.0.9 the auto configured kafkaadmin does not read the security information. The producer uses admin client to add some metrics and it will error out with the same message. You need to create your own bean for KafkaAdmin.

Michi answered 5/2, 2024 at 20:19 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.