How to create a Topic in Kafka through Java
Asked Answered
C

6

46

I want to create a topic in Kafka (kafka_2.8.0-0.8.1.1) through java. It is working fine if I create a topic in command prompt, and If I push message through java api. But I want to create a topic through java api. After a long search I found below code,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

I tried above code and it is showing that topic is created but I am not able to push message in the topic. Any thing wrong in my code? Or any other way to achieve the above?

Cymoid answered 20/11, 2014 at 10:12 Comment(0)
C
45

Edit - Zookeeper is not required in newer version of Kafka. Please see answer by @Neeleshkumar Srinivasan Mannur for API version 0.11.0+



Original answer

I fixed it.. After a long research..

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

From the above code, ZkClient will create a topic but this topic information will not have awareness for the kafka. So what we have to do is, we need to create object for ZkClient in following way,

First import the below statement,

import kafka.utils.ZKStringSerializer$;

and create object for ZkClient in the following way,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

Edit 1: (for @ajkret comment)

The above code won't work for kafka > 0.9 since the api has been changed, Use the below code for kafka > 0.9


import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

public class KafkaTopicCreationInJava
{
    public static void main(String[] args) throws Exception {
        ZkClient zkClient = null;
        ZkUtils zkUtils = null;
        try {
            String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
            zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

            String topicName = "testTopic";
            int noOfPartitions = 2;
            int noOfReplication = 3;
            Properties topicConfiguration = new Properties();

            AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }
    }
}
Cymoid answered 20/11, 2014 at 12:31 Comment(5)
Yes sending of messages to the Topic was possible , But dint get the messages by the consumer? any thought on it?Micturition
Are you trying to consume by Low Level Consumer or High Level Consumer? Can you please provide some piece of code.Cymoid
The interface completely changed after 0.9.0.1. createTopic does not accept a ZkClient anymore.Ulibarri
AdminUtils is deprecated with AdminZkClientOregon
zkUtils is now deprecated in Kafka 2.12-2.1.0.Polity
U
35

The process seems to be pretty much simplified in API 0.11.0+. Using that, it can be done as follows

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;

Properties properties = new Properties();
properties.load(new FileReader(new File("kafka.properties")));

AdminClient adminClient = AdminClient.create(properties);
NewTopic newTopic = new NewTopic("topicName", 1, (short)1); //new NewTopic(topicName, numPartitions, replicationFactor)

List<NewTopic> newTopics = new ArrayList<NewTopic>();
newTopics.add(newTopic);

adminClient.createTopics(newTopics);
adminClient.close();

The contents of kafka.properties file are as follows

bootstrap.servers=localhost:9092
group.id=test
enable.auto.commit=true
auto.commit.interval.ms=1000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Note that the instance of the AdminClient must be closed in order to reflect the newly created topic.

Ultramicrometer answered 19/12, 2018 at 12:36 Comment(2)
Thanks for the clarification. Where is this documented?Hennie
don't think it necessarily needs to be closed, basically createTopics seems to be lazy method that returns a Future-type of object - to force execution you can try "get" the response out of itHawfinch
S
7

Just a pointer to anyone looking at this with a updated version of Kafka (At the time of writing this, I was using Kafka v0.10.0.0).

You have to change;

AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, topicConfiguration);

To the following;

AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, true, Enforced$.MODULE$);

It is also a good idea to close the connection once finished;

zkClient.close();
Sylvia answered 13/7, 2017 at 15:53 Comment(0)
M
5

For those trying to achieve this in kafka v0.10.2.1 and running into issues with serialization error 'java.io.StreamCorruptedException: invalid stream header: 3139322E' following is a sample working code with the needful imports.

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer;
import kafka.utils.ZkUtils;

public static void createTopic(String topicName, int numPartitions, int numReplication) {
        ZkClient zkClient = null;
        ZkUtils zkUtils = null;
        try {
            String zookeeperHosts = "199.98.916.902:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs);
            //Ref: https://gist.github.com/jjkoshy/3842975
            zkClient.setZkSerializer(new ZkSerializer() {
                @Override
                public byte[] serialize(Object o) throws ZkMarshallingError {
                    return ZKStringSerializer.serialize(o);
                }

                @Override
                public Object deserialize(byte[] bytes) throws ZkMarshallingError {
                    return ZKStringSerializer.deserialize(bytes);
                }
            });

            zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

            int noOfPartitions = 2;
            int noOfReplication = 3;
            Properties topicConfiguration = new Properties();

            AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration,
                    RackAwareMode.Enforced$.MODULE$);

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }
    }
Mohammed answered 4/9, 2017 at 10:38 Comment(0)
M
5

AdminUtils API is getting deprecated. There is new API AdminZkClient which we can use to manage topics in Kafka server.

String zookeeperHost = "127.0.0.1:2181";
Boolean isSucre = false;
int sessionTimeoutMs = 200000;
int connectionTimeoutMs = 15000;
int maxInFlightRequests = 10;
Time time = Time.SYSTEM;
String metricGroup = "myGroup";
String metricType = "myType";
KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperHost,isSucre,sessionTimeoutMs,
                connectionTimeoutMs,maxInFlightRequests,time,metricGroup,metricType);

AdminZkClient adminZkClient = new AdminZkClient(zkClient);

String topicName1 = "myTopic";
int partitions = 3;
int replication = 1;
Properties topicConfig = new Properties();

adminZkClient.createTopic(topicName1,partitions,replication,
            topicConfig,RackAwareMode.Disabled$.MODULE$);

You can refer this link for details: https://www.analyticshut.com/streaming-services/kafka/create-and-list-kafka-topics-in-java/

Macronucleus answered 23/6, 2018 at 10:41 Comment(1)
Documentation for the class AdminZkClient -> This is an internal class and no compatibility guarantees are provided, see org.apache.kafka.clients.admin.AdminClient for publicly supported APIs, the correct answer as of now is found in Neeleshkumar Srinivasan Mannur's answerCima
T
0
public static void create(String name) {
   AdminClient client = AdminClient.create(properties());
   NewTopic topic = new NewTopic(
      name,
      (int)conf().get("partition"),
      Short.parseShort(String.valueOf(conf().get("replication.factor"))));

   client.createTopics(Collections.singleton(topic));
   client.close();
}
Tisatisane answered 16/1, 2022 at 11:48 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.