What is the cleaner and efficient way of implementing health check for Kafka in my spring boot application?
Asked Answered
T

1

11

I have a spring boot (2.1.6) application that both consumes and produces messages to a (organization wide) common kafka instance. I am trying to implement health checks for this kafka broker in my application using spring actuator and I am facing a bunch of issues related to performance and logging. There was a health indicator inbuilt in spring boot 2.0 but they took it out due to some apparent issues.

Here is the healthcheck class I implemented:

@Component
public class KafkaHealthCheck implements HealthIndicator {

  private static final Logger logger = LoggerFactory.getLogger(KafkaHealthCheck.class);

  private KafkaAdmin kafkaAdmin;

  private Map<String, Object> kafkaConfig;

  @Value("${application.topic}")
  private String topicName;

  @Value(value = "${kafka.bootstrapAddress}")
  private String bootstrapAddress;

  public KafkaHealthCheck(KafkaAdmin kafkaAdmin) {
    this.kafkaAdmin = kafkaAdmin;

  }

  @PostConstruct
  public void setUpAdminClient() {
    kafkaConfig = new HashMap<>();
    kafkaConfig.putAll(kafkaAdmin.getConfig());
    kafkaConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
  }

  @Override
  public Health health() {
    Long start = System.currentTimeMillis();
    try (AdminClient adminClient = AdminClient.create(kafkaConfig)) {

      DescribeClusterOptions describeClusterOptions = new DescribeClusterOptions().timeoutMs(2000);
      adminClient.describeCluster(describeClusterOptions);

      adminClient.describeConsumerGroups(List.of("topic")).all()
          .get(2, TimeUnit.SECONDS);

      Map<String, TopicDescription> topicDescriptionMap = adminClient
          .describeTopics(List.of(topicName)).all().get(2, TimeUnit.SECONDS);

      List<TopicPartitionInfo> partitions = topicDescriptionMap.get(topicName)
          .partitions();

      if (partitions == null || partitions.isEmpty()) {
        logger.warn(String
            .format("Kafka healthcheck failed - No partition found for topic: %s", topicName));
        return Health.down()
            .withDetail("Kafka healthcheck failed", "No partition found for topic: " + topicName)
            .build();
      } else {
        if (partitions.stream().anyMatch(p -> p.leader() == null)) {
          logger.warn(
              String.format("Kafka healthcheck failed - No partition leader found for topic: %s",
                  topicName));
          return Health.down().withDetail("Kafka healthcheck failed",
              "No partition leader found for topic: " + topicName).build();
        }
      }
    } catch (Exception e) {
      logger.warn("Kafka healthcheck failed", e);
      return Health.down()
          .withDetail("Kafka healthcheck failed", "Exception occurred during healthcheck").build();
    }
    System.out.println(System.currentTimeMillis() - start);
    return Health.up().build();
  }
}

Now these are the questions I have or the issues I am facing with this implementation:

1 - The KafkaAdmin is injected in this class with all the configuration I have (I am using SSL) except the 'bootstrap.servers'. I figured out that org.springframework.boot.autoconfigure.kafka.KafkaProperties has localhost:9092 as a default which was somehow not being overridden by the application config while it works fine for consumer and producer. I have no idea why is it and hence I have to setup it up here manually.

2 - I added timeouts to DescribeClusterOptions and describeConsumerGroups but those timeouts seems to be totally ignored. If I manually take down the broker, the healthcheck takes about a couple of minutes to report error.

3 - Because of the bootstrap.servers error, when I actually deployed the application, it almost killed my log server with millions of log lines generated by org.apache.kafka.clients.NetworkClient saying Connection to node -1 could not be established. Broker may not be available.. How can I stop it from actually happening again ? Even in cases when the broker goes down during operations.

4 - Even the successful health check is generating a lot of log lines when I create AdminClient. It logs out all the config it read and a bunch of other statements. Any chance of minimizing it ?

5 - Overall, this is very slow. I was trying to calculate the time it takes to run only this healthcheck and it is about 1.5 seconds on average. Any chance of optimizing it ?

Tjirebon answered 11/12, 2019 at 13:54 Comment(4)
I would suggest separating the cluster and topic healthchecks so they can report separatelyOdds
You are doing too much of things in health checks. A simple listTopics is enough to understand if the cluster is up and running. Your application would anyways fail if there are no topics and partitions which you expect. You don't have to do that explicitly every time in a health check call.Mischiefmaker
Also, could you try creating only one instance of Admin client instead of creating it every time? many be to initiate the connection depending upon if you are using SSL and all would take time to connect to Kafka BrokerMischiefmaker
Move AdminClinent to class level and just use listTpoics() to get topics list.Deltoid
F
0

Below is a code example for implementing Kafka health check, and my code doesn't have any performance issues.

Define application properties in such a way below -

1 -

management.endpoints.web.exposure.include=health,info management.endpoint.health.show-details=always

2 -

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult.ListTopicsFuture;
import org.apache.kafka.common.errors.TimeoutException;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.stereotype.Component;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException as KafkaTimeoutException;

@Component
public class KafkaHealthIndicator implements HealthIndicator {

    private final AdminClient adminClient;

    public KafkaHealthIndicator(AdminClient adminClient) {
        this.adminClient = adminClient;
    }

    @Override
    public Health health() {
        try {
            ListTopicsResult topics = adminClient.listTopics();
            topics.listings().get(10, TimeUnit.SECONDS); // Adjust timeout as needed
            return Health.up().build();
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            return Health.down(e).build();
        } catch (KafkaTimeoutException e) { return Health.down(new TimeoutException("Kafka is not reachable")).build();
        }
    }
}

3 -

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

@Configuration
public class KafkaConfiguration {

    @Bean
    public AdminClient kafkaAdminClient() {
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Adjust as needed
      return AdminClient.create(properties);
    }
}

4 - Finally we can check kafka health status using url

**http://localhost:8080/actuator/health**
Ferric answered 16/8, 2024 at 6:59 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.