Check If System Is Connected To Kafka
Asked Answered
D

2

13

I need to write a smoke test in Java which validates whether the system is connected to kafka,

Does anyone have any idea? I have found this post:

How to check whether Kafka Server is running?

But it's too complicated to do from a Java code and I don't think It's the direction i should use.

Thanks in advance.

Diagnose answered 7/12, 2016 at 11:17 Comment(3)
have you found any answer to this question? Why do you think the below answers are not good enough?Stocking
unfortunately I haven't got to check any solution. So I can't say any of the solutions aren't good enough.Diagnose
spring micrometer metrics works like a charm https://mcmap.net/q/905687/-spring-actuator-kafka-streams-add-kafka-stream-status-to-health-check-endpointJumbala
S
16

I had the same question and I don't want to leave this question without any answer. I read a lot about how I can check the connection and most of the answers I found was checking the connection with Zk, but I really want to check the connection directly with Kafka server.

What I did is to create a simple KafkaConsumer and list all the topics with listTopics(). If the connection is success, then you will get something as a return. Otherwise, you will get a TimeoutException.

  def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString)
    props.put("group.id", kafkaParams.get("group.id").get.toString)
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val simpleConsumer = new KafkaConsumer[String, String](props)
    simpleConsumer.listTopics()
  }

then you can wrap this method in a try-catch sentence to catch the exception.

Stocking answered 24/11, 2017 at 16:6 Comment(1)
you may also need to change the default timeout values : props.put(REQUEST_TIMEOUT_MS_CONFIG, "5000"); props.put(SESSION_TIMEOUT_MS_CONFIG, "4000"); to avoid waiting too much time.Dorey
S
3

Edit: This was for very old very of kafka. Do not use this in 2023 :)


You can check if the server is running by using this:

ZkClient zkClient = new ZkClient("your_zookeeper_server", 5000 /* ZOOKEEPER_SESSION_TIMEOUT */, 5000 /* ZOOKEEPER_CONNECTION_TIMEOUT */, ZKStringSerializer$.MODULE$);
List<Broker> brokers = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster());
if (brokers.isEmpty()) {
    // No brokers available
} else {
    // There are brokers available
}
Somewise answered 8/12, 2016 at 19:30 Comment(2)
This is a way to check Zookeeper connection, not Kafka server.Stocking
Yeah as @Stocking mentioned this may have a double pitfall 1. Your application may not be able to reach Kafka while reaching ZK 2. You ZK may not be able to reach Kafka due to network or security confZoes

© 2022 - 2024 — McMap. All rights reserved.