How to create a Kafka Topic using Confluent.Kafka .Net Client
Asked Answered
J

3

17

It seems like most popular .net client for Kafka (https://github.com/confluentinc/confluent-kafka-dotnet) is missing methods to setup and create Topics. When calling Producer.ProduceAsync() the topic is created automatically but I can't find a way to setup partitions, retention policy and other settings.

I tried to find any examples online but all I found just use defaults.

Maybe there is another .net client that I can use instead?

Josephina answered 27/4, 2018 at 13:23 Comment(1)
"ProduceAsync" only creates a topic automatically if auto topic creation is enabled at the broker (which in most cases, it shouldn't be)Diatessaron
R
23

It is now available in latest release of Confluent.Kafka .Net client library.

See: https://github.com/confluentinc/confluent-kafka-dotnet/blob/b7b04fed82762c67c2841d7481eae59dee3e4e20/examples/AdminClient/Program.cs

        using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
        {
            try
            {
                await adminClient.CreateTopicsAsync(new TopicSpecification[] { 
                    new TopicSpecification { Name = topicName, ReplicationFactor = 1, NumPartitions = 1 } });
            }
            catch (CreateTopicsException e)
            {
                Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
            }
        }
Reliance answered 25/4, 2019 at 12:41 Comment(0)
H
2

Confluent yet not provide any APIs to create topic from dot net client, however there is a workaround for the same.

  1. Set auto.create.topics.enable = true in kafka configuration

  2. use var brokerMetadata = producer.GetMetadata(false, topicName); to query available topics in existing brokers, if specified topic is not available then kafka will create a topic with specified name.

    private static bool CreateTopicIfNotExist(Producer producer, string topicName)
    {
        bool isTopicExist = producer.GetMetadata().Topics.Any(t => t.Topic == topicName);
        if (!isTopicExist)
        {
            //Creates topic if it is not exist; Only in case of auto.create.topics.enable = true is set into kafka configuration
            var topicMetadata = producer.GetMetadata(false, topicName).Topics.FirstOrDefault();
            if (topicMetadata != null && (topicMetadata.Error.Code != ErrorCode.UnknownTopicOrPart || topicMetadata.Error.Code == ErrorCode.Local_UnknownTopic))
                isTopicExist = true;
        }
        return isTopicExist;
    }

Thus you can use this work around, I know this is dirty solution but it seems there is no other way as of now.

Holguin answered 12/7, 2018 at 9:44 Comment(2)
The problem that I am facing is still exists: the topic will be created with all default settings. Simple call to Producer.ProduceAsync() also creates a topic if not existsJosephina
Answer needs to be updated with the correct api, or clarify this will use default topic settingsDiatessaron
P
0

Confluent.Kafka.AdminClient is available in version 1.0.0-experimental-2 but doesn't allow creating topics etc.

Its built on librdkafka which doesn't have APIs for this yet.

So for now you have to configure this on the broker using e.g. bin\windows\kafka-topics.sh --create ...

Paramagnetic answered 27/4, 2018 at 16:2 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.