org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
Asked Answered
C

7

21

I am manually starting Zookeeper, then Kafka server and finally the Kafka-Rest server with their respective properties file. Next, I am deploying my Spring Boot application on tomcat

In the Tomcat log trace, I am getting the Error org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer and my application is failing to startup

Error Log

25-Dec-2017 15:00:32.508 SEVERE [localhost-startStop-1] org.apache.catalina.core.ContainerBase.addChildInternal ContainerBase.addChild: start:
 org.apache.catalina.LifecycleException: Failed to start component [StandardEngine[Catalina].StandardHost[localhost].StandardContext[/spring-kafka-webhook-service-0.0.1-SNAPSHOT]]
        at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:167)
        at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:752)
        at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:728)
        at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:734)
        at org.apache.catalina.startup.HostConfig.deployWAR(HostConfig.java:986)
        at org.apache.catalina.startup.HostConfig$DeployWar.run(HostConfig.java:1857)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178)
        at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:50)
        at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:348)
        at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:151)
        at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:114)
        at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:880)
        at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.finishRefresh(EmbeddedWebApplicationContext.java:144)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546)
        at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122)
        at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693)
        at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:303)
        at org.springframework.boot.web.support.SpringBootServletInitializer.run(SpringBootServletInitializer.java:154)
        at org.springframework.boot.web.support.SpringBootServletInitializer.createRootApplicationContext(SpringBootServletInitializer.java:134)
        at org.springframework.boot.web.support.SpringBootServletInitializer.onStartup(SpringBootServletInitializer.java:87)
        at org.springframework.web.SpringServletContainerInitializer.onStartup(SpringServletContainerInitializer.java:169)
        at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5196)
        at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150)
        ... 10 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
        at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:73)
        at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:69)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:305)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:230)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:180)
        at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:202)
        at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:126)
        at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:202)
        at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:287)
        at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:236)
        at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:175)
        ... 27 more
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/common/ClusterResourceListener
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at org.apache.catalina.loader.WebappClassLoaderBase.findClassInternal(WebappClassLoaderBase.java:2283)
        at org.apache.catalina.loader.WebappClassLoaderBase.findClass(WebappClassLoaderBase.java:811)
        at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1260)
        at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1119)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:332)
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:225)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:643)
        ... 39 more
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.ClusterResourceListener
        at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1291)
        at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1119)
        ... 51 more

Receiver class

public class InventoryEventReceiver {
    
    private static final Logger log = LoggerFactory.getLogger(InventoryEventReceiver.class);
    
    private CountDownLatch latch = new CountDownLatch(1);
    
    public CountDownLatch getLatch() {
        return latch;
    }
    
    @KafkaListener(topics="inventory", containerFactory="kafkaListenerContainerFactory")
    public void listenWithHeaders(
            @Payload InventoryEvent event,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.OFFSET) String offset
            ) {
        
        System.out.println("EVENT HAS BEEN RECEIVED by listenWithHeaders(InventoryEvent)");
        System.out.println(event.toString());
    
        
        log.info(System.currentTimeMillis() + "-- Received Event :\"" + event + "\" from partition:offset -- " + partition + ":" + offset +
                " for topic : " + topic);       
        
        String urlForInventoryListeners = "http://localhost:8080/" + topic + "/listeners";
        OutputStream os = null;
        try {
            URL objectUrl = new URL(urlForInventoryListeners);
            HttpURLConnection con = (HttpURLConnection) objectUrl.openConnection();
            con.setRequestMethod("POST");
            con.setRequestProperty("Content-Type", "application/json; charset=UTF-8");
            con.setRequestProperty("topic", topic);
            Gson gson = new Gson();
            String eventJson = gson.toJson(event);
            con.setDoOutput(true);
            os = con.getOutputStream();
            os.write(eventJson.getBytes("UTF-8"));
            System.out.println("Event sent to " + objectUrl);
            
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println(e.getMessage());
        } finally {
            try {
                os.close();
            } catch (IOException e) {
                e.printStackTrace();
                System.out.println(e.getMessage());
            }
        }
        
        latch.countDown();
    }
    
}

Receiver config class

@Configuration
@EnableKafka
public class InventoryReceiverConfig {
    
    @Autowired
    private KafkaConfig kafkaConfig;
    
    @Bean
    public static ConsumerFactory<String, InventoryEvent> consumerFactory() { 
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), 
                new JsonDeserializer<>(InventoryEvent.class));
    }
    
    @Bean
    public static ConcurrentKafkaListenerContainerFactory<String, InventoryEvent> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, InventoryEvent> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        containerFactory.setConsumerFactory(consumerFactory());
        containerFactory.setConcurrency(3); 
        containerFactory.getContainerProperties().setPollTimeout(3000);
        return containerFactory;
    }
    
    @Bean
    public static Map<String, Object> consumerConfigs() {
        Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,"inventory_consumers");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);
        consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor");
        return consumerProps;
    }
    
    @Bean
    public InventoryEventReceiver receiver() {
        return new InventoryEventReceiver();
    }
        
}

And my cluster properties file for server.properties, consumer.properties and kafka-rest.properties are as follows:

server.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# Switch to enable topic deletion or not, default value is false
delete.topic.enable=true

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://localhost:9092
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
##################### Confluent Proactive Support ######################
# If set to true, and confluent-support-metrics package is installed
# then the feature to collect and report support metrics
# ("Metrics") is enabled.  If set to false, the feature is disabled.
#
confluent.support.metrics.enable=true
############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0


# The customer ID under which support metrics will be collected and
# reported.
#
# When the customer ID is set to "anonymous" (the default), then only a
# reduced set of metrics is being collected and reported.
#
# Confluent customers
# -------------------
# If you are a Confluent customer, then you should replace the default
# value with your actual Confluent customer ID.  Doing so will ensure
# that additional support metrics will be collected and reported.
#
confluent.support.customer.id=anonymous

consumer.properties

# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect=127.0.0.1:2181

# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

#consumer group id
group.id=test-consumer-group,inventory_consumers

#consumer timeout
#consumer.timeout.ms=5000

kafka-rest.properties

id=kafka-rest-test-server
schema.registry.url=http://localhost:8081
zookeeper.connect=localhost:2181
#
# Configure interceptor classes for sending consumer and producer metrics to Confluent Control Center
# Make sure that monitoring-interceptors-<version>.jar is on the Java class path
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.psl.kafka.spring</groupId>
    <artifactId>spring-kafka-webhook-service</artifactId>
    <packaging>war</packaging>

    <name>spring-kafka-webhook-service</name>
    <description>Spring Kafka Webhook Service</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.9.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>

    <repositories>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <org.springframework-version>5.0.0.RELEASE</org.springframework-version>
        <org.springframework.security-version>4.0.1.RELEASE</org.springframework.security-version>
        <org.aspectj-version>1.8.11</org.aspectj-version>
        <org.slf4j-version>1.7.12</org.slf4j-version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.1.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>1.1.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.16</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.2</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-annotations</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.json</groupId>
            <artifactId>json</artifactId>
            <version>20160810</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>2.1.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>monitoring-interceptors</artifactId>
            <version>3.1.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


    <version>0.0.1-SNAPSHOT</version>
</project>

My Receiver and Sender classes are not annotated with any annotations such as @Component or @Service. Does it make any difference?

@Configuration 
public class InventorySenderConfig

@Configuration 
@EnableKafka 
public class InventoryReceiverConfig

@Component 
public class KafkaConfig

@Configuration 
public class ProducingChannelConfig

@Configuration 
public class ConsumingChannelConfig

@RestController 
public class KafkaWebhookController

@Service("webhookService") 
public class KafkaServiceImpl

@EnableIntegration 
@SpringBootApplication 
@ComponentScan("com.psl.kafka") 
public class SpringKafkaWebhookServiceApplication extends SpringBootServletInitializer

These are my class annotations. Do they look to be OK or I need to change something ?

New Build Error after kafka version update to 0.10.1.1

2017-12-26 13:11:44.490  INFO 13444 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.10.1.1
2017-12-26 13:11:44.490  INFO 13444 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : f10ef2720b03b247
2017-12-26 13:12:44.499 ERROR 13444 --- [           main] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='inventory-events' and payload='Hello Spring Integration Kafka 0!' to topic inventory:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

2017-12-26 13:12:44.501  WARN 13444 --- [           main] o.a.k.c.p.i.ProducerInterceptors         : Error executing interceptor onAcknowledgement callback

java.lang.IllegalStateException: clusterResource is not defined
    at io.confluent.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:174) ~[monitoring-interceptors-3.1.1.jar:na]
    at io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor.onAcknowledgement(MonitoringProducerInterceptor.java:59) ~[monitoring-interceptors-3.1.1.jar:na]
    at org.apache.kafka.clients.producer.internals.ProducerInterceptors.onSendError(ProducerInterceptors.java:116) ~[kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:489) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:436) [kafka-clients-0.10.1.1.jar:na]

Do I need to define any Interceptor classes that I have added as config in ProducingChannelConfig, ConsumingChannelConfig as well as InventoryReceiverConfig classes ?

Convent answered 25/12, 2017 at 15:31 Comment(2)
What are your kafka jar versionsJointer
Added the pom.xmlConvent
A
5

Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.ClusterResourceListener

You are missing the kafka-clients jar from your class path. What are you using for dependency management? Maven and gradle should put this jar on the class path for you automatically.

Antecedence answered 25/12, 2017 at 15:50 Comment(10)
Using maven. Added pom.xml.Convent
You have some kind of conflicting versions on your class path somehow; that class ClusterResourceListener was added in the 0.10.2.x client. Perhaps you have a newer version in one of tomcat's directories?Antecedence
Don't do it like that (many comments) delete them and edit the question instead. It has nothing to do with your configuration; it's a class loader problem. Use -verbose JVM arg to see which jars classes are being loaded from. Also, why are you using such old versions? The current version in the 1.1.x line of spring-kafka is 1.1.7.Antecedence
Ok removed comments. Added in questionConvent
Even after making spring-kafka and spring-kafka-test to 1.1.7.RELEASE and changing spring-integration-kafka to 2.1.2.RELEASE from 2.1.0.RELEASE, I am getting an error in pom.xml "Missing artifact org.springframework.boot:spring-boot-starter-test:jar:1.5.9.RELEASE (Click for 68 more)" though spring-boot-starter-test is clearly added in my pom.xmlConvent
I reverted spring-integration-kafka to` 2.1.0.RELEASE` and changed spring-kafka, spring-kafka-test to 1.1.7.RELEASE from 1.1.1.RELEASE, and the maven error is resolved, but as you said the dependency is still bringing kafka-clients-0.10.0.1.jar in the class pathConvent
The default kafka jars brought as dependency is kafka_2.11-0.10.0.1.jar and kafka_2.11-0.10.0.1-test.jarConvent
Just to add that even after commenting the kafka-clients-0.10.0.1 in my pom.xml, the default dependencies brought to classpath is also kafka-clients-0.10.0.1.jar and kafka-clients-0.10.0.1-test.jarConvent
The maven dependencies were not getting added to classpath due to some maven settings problem. Now the correct dependencies are brought in kafka-clients-0.10.1.1, kafka_2.11-0.10.1.1. I have made spring-kafka 1.1.7.RELEASE and spring-integration-kafka 2.1.0.RELEASE. But a new problem has come up, which is added in questionConvent
Well the build error java.lang.IllegalStateException: clusterResource is not defined is resolved after removing the interceptor classes from Producer and Consumer configs. It seems that it was asking for additional Interceptor class definitions. Thanks "@Gary Russell" for your valuable inputs but since my problem had been upgrade of kafka jars, I am accepting "@user7294900"'s answerConvent
J
2

Your kafka version is older than it should, your problematic class ClusterResourceListener expect version to be higher than 0.10.1.0, and your version is lower 0.10.0.1.

Just upgrade your kafka jars and it'll fix this issue.

There will be one invocation of onUpdate(ClusterResource) after each metadata response. Note that the cluster id may be null when the Kafka broker version is below 0.10.1.0.

Jointer answered 26/12, 2017 at 5:53 Comment(6)
I upgraded the spring-kafka and spring-kafka-test to 1.1.7.RELEASE from 1.1.1.RELEASE as mentioned in comments to "@Gary Russell" but the dependency brought automatically to Referenced Libraries is 0.10.0.1. I can't change by force as it is giving error in pom.xml due to several version incompatibilitiesConvent
The default kafka jars brought as dependency is kafka_2.11-0.10.0.1.jar and kafka_2.11-0.10.0.1-test.jarConvent
Just to add that even after commenting the kafka-clients-0.10.0.1 in my pom.xml, the default dependencies brought to classpath is also kafka-clients-0.10.0.1.jar and kafka-clients-0.10.0.1-test.jarConvent
Please find my last comment to "@Gary Russell". Successfully updated the kafka jars from 0.10.0.1 to 0.10.1.1 But a new problem is coming while building, added in questionConvent
I think you should open a new question because it will look a mess afterwardsJointer
The build error is resolved. Seems I dont need the interceptor definitions in my case. Thanks for your answer. My kafka jar was not getting correctly added to classpath and successfully updating that resolved the issue.Convent
O
2

Check out if you're using StringDeserializer instead of StringSerializer

properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Oversew answered 19/2, 2022 at 13:4 Comment(0)
M
0

I also encountered this error in one of my recent project. The error could also be as a result of missing Kafka server. The app is trying to find configuration or server at application.properties file (PS: your app loads from application.properties if you don't specify which profile the app should load from).

Solution Edit your configuration(if using IntelliJ) and in the program argument, add the profile where you have added your Kafka bootstrap servers. This way, when the app starts, you have specified that the app runs on the specified profile which contains the Kafka bootstrap servers and details.

--spring.profiles.active= <profile_name>

Alternatively, if you don't want to go through this long route, just create an application.properties file without specifying any profile and add all your Kafka settings in this properties file. It should work fine since, by default, the app runs application.properties.

Moreau answered 14/6, 2020 at 19:11 Comment(0)
S
0

In my case, i had used KafkaAvroSerialiser as deserializer in my consumer instead of KafkaAvroDeserializer

Superficies answered 19/10, 2020 at 7:14 Comment(0)
C
0

My problem was that the consumer expected message type as Activity (my own class) – KafkaItemReader<Activity, Finance> etc., and my producers and factories produced String (ProducerFactory<String, String> etc.).

To solve it, I changed producer to ProducerFactory<String, String> etc., and of course

String message = "checked in %s in %s starting at %s"
            .formatted(activity.getName(), activity.getLocation(), activity.getStartDate());
kafkaTemplate.send(checkInTopicName, message);

to

kafkaTemplate.send(checkInTopicName, activity);
Colp answered 22/9, 2023 at 12:20 Comment(0)
C
0

I was having this problem because I had this in my app.properties:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer

Instead of this:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

In short, I was not deserializing it. Hence, it failed to construct the kafka consumer

Classy answered 13/5, 2024 at 4:18 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.