I am using following code to extract Kafka broker list from zookeeper:
private static String getBrokerList() {
try {
ZooKeeper zookeeper = new ZooKeeper(zookeeperConnect, 15000, null);
List<String> ids = zookeeper.getChildren(ZkUtils.BrokerIdsPath(), false);
List<String> brokerList = new ArrayList<>();
for (String id : ids) {
String brokerInfo = new String(zookeeper.getData(ZkUtils.BrokerIdsPath() + '/' + id, false, null), Charset.forName("UTF-8"));
JsonObject jsonElement = new JsonParser().parse(brokerInfo).getAsJsonObject();
String host = jsonElement.get("host").getAsString();
brokerList.add(host + ':' + jsonElement.get("port").toString());
}
return Joiner.on(",").join(brokerList);
} catch (KeeperException | InterruptedException e) {
Throwables.propagate(e);
}
return "";
}
Above code is working fine when one thread executing the code at a time. However, when several threads are executing the above code it fails with the following exception occasionally:
Caused by: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /brokers/ids
at org.apache.zookeeper.KeeperException.create(KeeperException.java:99)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1532)
at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1560)
What am I doing wrong here? My zookeeper version is 3.4.6-1569965.