I have a spring boot (2.1.6) application that both consumes and produces messages to a (organization wide) common kafka instance. I am trying to implement health checks for this kafka broker in my application using spring actuator and I am facing a bunch of issues related to performance and logging. There was a health indicator inbuilt in spring boot 2.0 but they took it out due to some apparent issues.
Here is the healthcheck class I implemented:
@Component
public class KafkaHealthCheck implements HealthIndicator {
private static final Logger logger = LoggerFactory.getLogger(KafkaHealthCheck.class);
private KafkaAdmin kafkaAdmin;
private Map<String, Object> kafkaConfig;
@Value("${application.topic}")
private String topicName;
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public KafkaHealthCheck(KafkaAdmin kafkaAdmin) {
this.kafkaAdmin = kafkaAdmin;
}
@PostConstruct
public void setUpAdminClient() {
kafkaConfig = new HashMap<>();
kafkaConfig.putAll(kafkaAdmin.getConfig());
kafkaConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
}
@Override
public Health health() {
Long start = System.currentTimeMillis();
try (AdminClient adminClient = AdminClient.create(kafkaConfig)) {
DescribeClusterOptions describeClusterOptions = new DescribeClusterOptions().timeoutMs(2000);
adminClient.describeCluster(describeClusterOptions);
adminClient.describeConsumerGroups(List.of("topic")).all()
.get(2, TimeUnit.SECONDS);
Map<String, TopicDescription> topicDescriptionMap = adminClient
.describeTopics(List.of(topicName)).all().get(2, TimeUnit.SECONDS);
List<TopicPartitionInfo> partitions = topicDescriptionMap.get(topicName)
.partitions();
if (partitions == null || partitions.isEmpty()) {
logger.warn(String
.format("Kafka healthcheck failed - No partition found for topic: %s", topicName));
return Health.down()
.withDetail("Kafka healthcheck failed", "No partition found for topic: " + topicName)
.build();
} else {
if (partitions.stream().anyMatch(p -> p.leader() == null)) {
logger.warn(
String.format("Kafka healthcheck failed - No partition leader found for topic: %s",
topicName));
return Health.down().withDetail("Kafka healthcheck failed",
"No partition leader found for topic: " + topicName).build();
}
}
} catch (Exception e) {
logger.warn("Kafka healthcheck failed", e);
return Health.down()
.withDetail("Kafka healthcheck failed", "Exception occurred during healthcheck").build();
}
System.out.println(System.currentTimeMillis() - start);
return Health.up().build();
}
}
Now these are the questions I have or the issues I am facing with this implementation:
1 - The KafkaAdmin is injected in this class with all the configuration I have (I am using SSL) except the 'bootstrap.servers'. I figured out that org.springframework.boot.autoconfigure.kafka.KafkaProperties
has localhost:9092
as a default which was somehow not being overridden by the application config while it works fine for consumer and producer. I have no idea why is it and hence I have to setup it up here manually.
2 - I added timeouts to DescribeClusterOptions
and describeConsumerGroups
but those timeouts seems to be totally ignored. If I manually take down the broker, the healthcheck takes about a couple of minutes to report error.
3 - Because of the bootstrap.servers error, when I actually deployed the application, it almost killed my log server with millions of log lines generated by org.apache.kafka.clients.NetworkClient
saying Connection to node -1 could not be established. Broker may not be available.
. How can I stop it from actually happening again ? Even in cases when the broker goes down during operations.
4 - Even the successful health check is generating a lot of log lines when I create AdminClient. It logs out all the config it read and a bunch of other statements. Any chance of minimizing it ?
5 - Overall, this is very slow. I was trying to calculate the time it takes to run only this healthcheck and it is about 1.5 seconds on average. Any chance of optimizing it ?
listTopics
is enough to understand if the cluster is up and running. Your application would anyways fail if there are no topics and partitions which you expect. You don't have to do that explicitly every time in a health check call. – MischiefmakerAdmin
client instead of creating it every time? many be to initiate the connection depending upon if you are using SSL and all would take time to connect to Kafka Broker – MischiefmakerAdminClinent
to class level and just uselistTpoics()
to get topics list. – Deltoid