How to get All Topics in apache kafka?
Asked Answered
M

4

2
    @RequestMapping(value = "/getTopics",method = RequestMethod.GET)
    @ResponseBody
    public Response getAllTopics() {
        ZkClient zkClient = new ZkClient(ZookeeperProps.zookeeperURL, ZookeeperProps.connectionTimeoutMs,
                ZookeeperProps.sessionTimeoutMs, ZKStringSerializer$.MODULE$);
        Seq<String> topics = ZkUtils.getAllTopics(zkClient);
        scala.collection.Iterator<String> topicIterator = topics.iterator();
        String allTopics = "";
        while(topicIterator.hasNext()) {
            allTopics+=topicIterator.next();
            allTopics+="\n";
        }

        Response response = new Response();
        response.setResponseMessage(allTopics);
        return response;

    }

I am novice in apache kafka. Now a days trying to understand kafka with zookeeper. I want to fetch the topics associated with zookeeper. so I am trying following things
a:) first i made the zookeeper client as shown below :

ZkClient(ZookeeperProps.zookeeperURL, ZookeeperProps.connectionTimeoutMs, ZookeeperProps.sessionTimeoutMs, ZKStringSerializer$.MODULE$);
Seq<String> topics = ZkUtils.getAllTopics(zkClient);

but topics is blank set while executing with Java code.I am not getting what is problem here. My Zookeeper Props is as follow : String zkConnect = "127.0.0.1:2181"; And zookeeper is running perfectly fine.
Please help guys.

Mercurate answered 24/4, 2015 at 12:42 Comment(0)
J
8

It's pretty simple. (My example is written in Java, but it would be almost the same in Scala.)

import java.util.List;

import org.apache.zookeeper.ZooKeeper;

public class KafkaTopicListFetcher {

    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
        List<String> topics = zk.getChildren("/brokers/topics", false);
        for (String topic : topics) {
            System.out.println(topic);
        }
    }
}

The result when I have three topics: test, test2, and test 3

test
test2
test3

The picture below is what I drew for my own blog posting. It would be helpful when you understand the structure of ZooKeeper tree that Kafka uses. (It looks pretty small here. Open the image in a new tab and zoom in please.)

ZooKeeper tree

Johnathanjohnathon answered 26/4, 2015 at 16:33 Comment(1)
With kafka 0.7x null is not acceptable value for watcher parameter in new ZooKeeper(...), need to have dummy parameter.Sluiter
L
4

You can use kafka AdminClient . Below code snippet may help you:

Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

AdminClient adminClient = AdminClient.create(properties);

ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);

System.out.println(adminClient.listTopics(listTopicsOptions).names().get());
Lanita answered 1/7, 2018 at 18:50 Comment(0)
P
0

I would prefer to use kafka-topics.sh which is a built in shell script of Kafka to get topics.

Prance answered 24/4, 2015 at 13:39 Comment(2)
Hi Thanks for reply . I just want to do the same with API.Mercurate
That shell script internally calls a Java (or Scala) classBabbler
G
0

Kafka Client library has AdminClient API: which supports managing and inspecting topics, brokers, configurations, ACL’s.

You can find code samples for

  • Creating new topic
  • Delete topic
  • Describe topic: gives Leader, Partitions, ISR and Replicas
  • List topics
  • Fetch controller broker/node details
  • All brokers/nodes details from the cluster

https://medium.com/nerd-for-tech/how-client-application-interact-with-kafka-cluster-made-easy-with-java-apis-58f29229d992

Giffer answered 28/2, 2021 at 15:24 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.