Is there a way to purge the topic in Kafka?
Asked Answered
S

27

259

I pushed a message that was too big into a kafka message topic on my local machine, now I'm getting an error:

kafka.common.InvalidMessageSizeException: invalid message size

Increasing the fetch.size is not ideal here, because I don't actually want to accept messages that big.

Schizophyceous answered 29/4, 2013 at 17:10 Comment(0)
R
454

Temporarily update the retention time on the topic to one second:

kafka-topics.sh \
  --zookeeper <zkhost>:2181 \
  --alter \
  --topic <topic name> \
  --config retention.ms=1000

And in newer Kafka releases, you can also do it with kafka-configs --entity-type topics

kafka-configs.sh \
  --zookeeper <zkhost>:2181 \
  --entity-type topics \
  --alter \
  --entity-name <topic name> \
  --add-config retention.ms=1000

then wait for the purge to take effect (duration depends on size of the topic). Once purged, restore the previous retention.ms value.

Retene answered 16/4, 2015 at 10:43 Comment(15)
That's a great answer but could you please add a description how to start with checking the topic's current retention.ms value?Openminded
I am not sure about checking the current config, but I believe resetting it back to default looks like: bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.msGastrolith
Or depending on version: --delete-config retention.msGastrolith
@GregDubicki, you can see the current log retention on the .properties filesMcguire
True, unless you changed it during runtime, @dazito.Openminded
In order to reset it back to default, I tried the --deleteConfig switch, but it wasn't recognised on my version (0.8.2.2). I used --delete-config instead.Steffi
It seems since 0.9.0, using kafka-topics.sh to alter the config is deprecated. The new option is to use the kafka-configs.sh script. e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000 This also allows you to check the current retention period, e.g. kafka-configs --zookeeper <zkhost>:2181 --describe --entity-type topics --entity-name <topic name>Anthropologist
Note that you also need to disable compaction if that is enabled on the topic.Leucippus
I had to restart the server and zookeeper for this to take effect.Vite
In 2.8.0 version --zookeeper is also deprecated. Better to use bootstrap server instead. kafka-configs.sh --bootstrap-server <bstserver>:9091 --entity-type topics --alter --entity-name <topic name> --add-config retention.ms=1000Function
Remember that the option delete.topic.enable=true needs to be previously in the config/server.properties file of Kafka service.Fluor
that purge can take days or weeks depending on how large the topic is and how much is actively going into itMisapprehension
@AndrewNorman is correct. This answer will not delete the entire topic. It will only delete SOME records, and it doesn't even guarantee that it deletes records older than 1 second. Why? Kafka records are stored in segments, and the active log segment is never subject to deletion, even if it has records that exceed the retention.ms.Lareine
If you want to purge, you need to set: cleanup.policy=deleteLeadwort
galeaspablo is right. Kafka by default uses compact strategy hence it will not really delete the message.Densify
N
113

To purge the queue you can delete the topic:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

then re-create it:

bin/kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 --partitions 1 --topic test
Neysa answered 24/3, 2015 at 12:54 Comment(3)
Remember to add line delete.topic.enable=true in file config/server.properties, as the warning printed by the mentioned command says Note: This will have no impact if delete.topic.enable is not set to true.Sesame
This is not instantaneous always. Sometimes it will just mark for deletion and actual deletion will happen later.Escritoire
If anyone interested in this method, please consider to use accepted answer. However this method can also be used. But, keep in mind that you'll also lose partitions that are assigned to each broker. So, when you re-create a topic you may expect some overhead depending on the configuration of your cluster. Another downside is, if you have active consumers and auto.create.topics.enable is set to true, you might end up with misconfigured topic.Phthisic
D
60

While the accepted answer is correct, that method has been deprecated. Topic configuration should now be done via kafka-configs.

kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic

Configurations set via this method can be displayed with the command

kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
Ditzel answered 21/4, 2016 at 17:56 Comment(2)
It's also worth adding: kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --delete-config retention.ms --entity-name MyTopicDiplostemonous
Just note: This takes some time to have effect (even for topic with 1 message) and the order of deletion is not guaranteed.Shopwindow
M
52

Here are the steps to follow to delete a topic named MyTopic:

  1. Describe the topic, and take note of the broker ids
  2. Stop the Apache Kafka daemon for each broker ID listed.
  3. Connect to each broker (from step 1), and delete the topic data folder, e.g. rm -rf /tmp/kafka-logs/MyTopic-0. Repeat for other partitions, and all replicas
  4. Delete the topic metadata: zkCli.sh then rmr /brokers/MyTopic
  5. Start the Apache Kafka daemon for each stopped machine

If you miss you step 3, then Apache Kafka will continue to report the topic as present (for example when if you run kafka-list-topic.sh).

Tested with Apache Kafka 0.8.0.

Metacarpal answered 19/2, 2014 at 13:32 Comment(8)
in 0.8.1 ./zookeeper-shell.sh localhost:2181 and ./kafka-topics.sh --list --zookeeper localhost:2181Lashawnda
Can use zookeeper-client instead of zkCli.sh (tried on Cloudera CDH5)Anabelle
This deletes the topic, not the data inside of it. This requires that the Broker be stopped. This is at best a hack. Steven Appleyard's answer is really the absolute best.Psychological
This was the only way at the time it was written.Metacarpal
Worked for me on Kafka 0.8.2.1, though the topis in zookeeper were under /brokers/topics/<topic name here>Softener
Does not work with kafka 0.9. The topic keeps coming back.Sacerdotal
This could be an issue from 0.9, as offsets are managed in another topic, consumers that are working off previous offsets - might see errors - haven't tried it though.Renfred
Needed rmr /brokers/topics/MyTopicMonkeypot
Y
47

Tested in Kafka 0.8.2, for the quick-start example: First, Add one line to server.properties file under config folder:

delete.topic.enable=true

then, you can run this command:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

Then recreate it, for clients to continue operations against an empty topic

Yogi answered 14/6, 2015 at 20:2 Comment(1)
How long does it take to delete a topic in this method? Is this the quickest approach (relative to the retention ms being set to a lower value)? I was looking for a time optimized solution.Elsworth
C
19

Following command can be used to delete all the existing messages in kafka topic:

kafka-delete-records --bootstrap-server <kafka_server:port> --offset-json-file delete.json

The structure of the delete.json file should be following:

{ "partitions": [ { "topic": "foo", "partition": 1, "offset": -1 } ], "version": 1 }

where offset :-1 will delete all the records (This command has been tested with kafka 2.0.1

Colman answered 25/7, 2020 at 11:9 Comment(7)
Isn't this the best answer? It does exactly what is asked. Recreating the topic would cause side effects due to repartitioning. Changing retention also doesn't guarantee everything gets deleted.Urn
it is best comment just it requires to create a file. The accepted answer is wrong.Verner
What happens if you have more than one partition?Hoang
Try using "-1" for "partitions". It seems to work (edit maybe it doesn't I just got a weird error message a while later)Hoang
To delete from all partitions, I haven't found an especially good solution, but it turns out you can just list the partitions in your JSON file. eg: "partition":0,"partition":1,"partition":2,...Hoang
Ah - no I'm wrong again. It just uses the last value you specifiedHoang
@FreelanceConsultant, If you have more than one partition you add a json obj for that partion in the partions list. Eg. : { "partitions": [ { "topic": "foo", "partition": 1, "offset": -1 } , { "topic": "foo", "partition": 2, "offset": -1 }, { "topic": "foo-next", "partition": 1, "offset": -1 }], "version": 1 }Colman
S
14

From kafka 1.1

Purge a topic

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --add-config retention.ms=100

wait at least 1 minute, to be secure that kafka purge the topic remove the configuration, and then go to default value

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --delete-config retention.ms
Structural answered 9/10, 2018 at 11:13 Comment(1)
I think you have an extra arrow. On mine, I was able to run bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config rentention.ms=100Privilege
G
9

kafka don't have direct method for purge/clean-up topic (Queues), but can do this via deleting that topic and recreate it.

first of make sure sever.properties file has and if not add delete.topic.enable=true

then, Delete topic bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic

then create it again.

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
Gitt answered 9/10, 2017 at 10:55 Comment(0)
E
7

Following @steven appleyard answer I executed the following commands on Kafka 2.2.0 and they worked for me.

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --describe

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --add-config retention.ms=1000

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --delete-config retention.ms
Emery answered 8/8, 2019 at 7:33 Comment(1)
This seems to duplicate other answersIngold
P
5

UPDATE: This answer is relevant for Kafka 0.6. For Kafka 0.8 and later see answer by @Patrick.

Yes, stop kafka and manually delete all files from corresponding subdirectory (it's easy to find it in kafka data directory). After kafka restart the topic will be empty.

Plerre answered 1/5, 2013 at 6:56 Comment(4)
This requires bringing down the Broker, and is at best a hack. Steven Appleyard's answer is really the absolute best.Psychological
@MaasSql I agree. :) This answer is two years old, about version 0.6. "alter topic" and "delete topic" functionality have been implemented later.Plerre
Steven Appleyard's answer is just as hacky as this one.Occupant
Having an application handle deleting its own data in a supported way is far less hacky than turning said application off and deleting what you think are all of its data files then turning it back on.Brotherton
L
5

Sometimes, if you've a saturated cluster (too many partitions, or using encrypted topic data, or using SSL, or the controller is on a bad node, or the connection is flaky, it'll take a long time to purge said topic.

I follow these steps, particularly if you're using TLS.

1: Run with kafka tools :

kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>

2: Run:

kafka-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning

3: Set topic retention back to the original setting, once topic is empty.

kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>

Hope this helps someone, as it isn't easily advertised.

Lymphoma answered 15/2, 2018 at 15:30 Comment(0)
B
5

A lot of great answers over here but among them, I didn't find one about docker. I spent some time to figure out that using the broker container is wrong for this case (obviously!!!)

## this is wrong!
docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
        at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
        at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
        at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
        at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
        at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)

and I should have used zookeeper:2181 instead of --zookeeper localhost:2181 as per my compose file

## this might be an option, but as per comment below not all zookeeper images can have this script included
docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000

the correct command would be

docker exec broker1 kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name dev_gdn_urls --add-config retention.ms=12800000

Hope it will save someone's time.

Also, be aware that the messages won't be deleted immediately and it will happen when the segment of the log will be closed.

Blend answered 8/1, 2020 at 6:43 Comment(3)
You can exec into the broker just fine. The problem is localhost:2181... E.g. You are misunderstanding the Docker networking features. In addition, not all Zookeeper containers have kafka-topics, so it is best not to use it that way. Latest Kafka installations allow for --bootstrap-servers to alter a topic instead of --zookeeperIngold
Still, exec into the Zookeeper container seems wrong. you can use --zookeeper zookeeper:2181` from the Kafka container is my point. Or even grep out the Zookeeper line from the server.properties fileIngold
@cricket_007 hey, thanks for this really, i corrected the answer, let me know if something is still wrong over thereBlend
M
4

Thomas' advice is great but unfortunately zkCli in old versions of Zookeeper (for example 3.3.6) do not seem to support rmr. For example compare the command line implementation in modern Zookeeper with version 3.3.

If you are faced with an old version of Zookeeper one solution is to use a client library such as zc.zk for Python. For people not familiar with Python you need to install it using pip or easy_install. Then start a Python shell (python) and you can do:

import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic') 

or even

zk.delete_recursive('brokers')

if you want to remove all the topics from Kafka.

Mira answered 3/6, 2014 at 15:49 Comment(1)
This leaves data behind on the brokers. You'll need to combine this solution with something like paramiko to SSH to every broker and cleanup actual topic dataIngold
D
4

The simplest approach is to set the date of the individual log files to be older than the retention period. Then the broker should clean them up and remove them for you within a few seconds. This offers several advantages:

  1. No need to bring down brokers, it's a runtime operation.
  2. Avoids the possibility of invalid offset exceptions (more on that below).

In my experience with Kafka 0.7.x, removing the log files and restarting the broker could lead to invalid offset exceptions for certain consumers. This would happen because the broker restarts the offsets at zero (in the absence of any existing log files), and a consumer that was previously consuming from the topic would reconnect to request a specific [once valid] offset. If this offset happens to fall outside the bounds of the new topic logs, then no harm and the consumer resumes at either the beginning or the end. But, if the offset falls within the bounds of the new topic logs, the broker attempts to fetch the message set but fails because the offset doesn't align to an actual message.

This could be mitigated by also clearing the consumer offsets in zookeeper for that topic. But if you don't need a virgin topic and just want to remove the existing contents, then simply 'touch'-ing a few topic logs is far easier and more reliable, than stopping brokers, deleting topic logs, and clearing certain zookeeper nodes.

Dipterous answered 6/6, 2014 at 20:9 Comment(1)
how to "set the date of the individual log files to be older than the retention period"? thanksEnquire
C
4

Besides updating retention.ms and retention.bytes, I noticed topic cleanup policy should be "delete" (default), if "compact", it is going to hold on to messages longer, i.e., if it is "compact", you have to specify delete.retention.ms also.

$ ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
            
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1

Also had to monitor earliest/latest offsets should be same to confirm this successfully happened, can also check the du -h /tmp/kafka-logs/test-topic-3-100-*

$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'

26599762
$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'

26599762

The other problem is, you have to get current config first so you remember to revert after deletion is successful: ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics

Cincinnatus answered 14/6, 2016 at 0:2 Comment(0)
P
4

If you want to do this programmatically within a Java Application you can use the AdminClient's API deleteRecords. Using the AdminClient allows you to delete records on a partition and offset level.

According to the JavaDocs this operation is supported by brokers with version 0.11.0.0 or higher.

Here is a simple example:

String brokers = "localhost:9092";
String topicName = "test";
TopicPartition topicPartition = new TopicPartition(topicName, 0);
RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset(5L);

Map<TopicPartition, RecordsToDelete> topicPartitionRecordToDelete = new HashMap<>();
topicPartitionRecordToDelete.put(topicPartition, recordsToDelete);

// Create AdminClient
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = AdminClient.create(properties);

try {
  adminClient.deleteRecords(topicPartitionRecordToDelete).all().get();
} catch (InterruptedException e) {
  e.printStackTrace();
} catch (ExecutionException e) {
  e.printStackTrace();
} finally {
  adminClient.close();
}
Plonk answered 28/4, 2021 at 6:53 Comment(0)
C
3

you have to enable this on config

echo "delete.topic.enable=true" >> /opt/kafka/config/server.properties 
sudo systemctl stop kafka 
sudo systemctl start kafka 

purge the topic

/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic flows

create the topic

# /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic Test

read the topic

# /opt/kafka/bin/kafka-console-consumer.sh  localhost:9092 --topic flows --from-beginning
Constituent answered 11/10, 2021 at 17:16 Comment(1)
Not exactly purging but works about the same!Thinker
M
3

The workaround of temporarily reducing the retention time for a topic, suggested by user644265 in this answer still works but recent versions of kafka-configs will warn that the --zookeeper option has been deprecated:

Warning: --zookeeper is deprecated and will be removed in a future version of Kafka

Use --bootstrap-server instead; for example

kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --add-config retention.ms=100

and

kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --delete-config retention.ms
Merryman answered 16/11, 2021 at 6:45 Comment(0)
N
2

To clean up all the messages from a particular topic using your application group (GroupName should be same as application kafka group name).

./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group

Nahamas answered 25/3, 2015 at 17:37 Comment(2)
There is a problem with this approach (tested in 0.8.1.1). If an application subscribes to two (or more) topics: topic1 and topic2 and the console consumer cleans up topic1, unfortunately it also deletes the unrelated consumer offset for topic2, which causes replay of all messages from topic2.Spangler
This won't purge/cleanup a topic. Plus, this will take far too long compared to kafka-consumer-groups --reset-offsetsIngold
T
2

Another, rather manual, approach for purging a topic is:

in the brokers:

  1. stop kafka broker
    sudo service kafka stop
  2. delete all partition log files (should be done on all brokers)
    sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*

in zookeeper:

  1. run zookeeper command line interface
    sudo /usr/lib/zookeeper/bin/zkCli.sh
  2. use zkCli to remove the topic metadata
    rmr /brokers/topic/<some_topic_name>

in the brokers again:

  1. restart broker service
    sudo service kafka start
Tissue answered 2/10, 2018 at 15:18 Comment(2)
You need to stop and remove files from each broker with a replica, which means you could have client downtime when doing thisIngold
you're correct, this one just let you actually see where some things are stored and managed by Kafka. but this brute-force approach is definitely not for a production running system.Tissue
B
2
./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic

This should give retention.ms configured. Then you can use above alter command to change to 1second (and later revert back to default).

Topic:myTopic   PartitionCount:6        ReplicationFactor:1     Configs:retention.ms=86400000
Berm answered 18/11, 2018 at 6:55 Comment(0)
M
2

From Java, using the new AdminZkClient instead of the deprecated AdminUtils:

  public void reset() {
    try (KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200_000,
        5000, 10, Time.SYSTEM, "metricGroup", "metricType")) {

      for (Map.Entry<String, List<PartitionInfo>> entry : listTopics().entrySet()) {
        deleteTopic(entry.getKey(), zkClient);
      }
    }
  }

  private void deleteTopic(String topic, KafkaZkClient zkClient) {

    // skip Kafka internal topic
    if (topic.startsWith("__")) {
      return;
    }

    System.out.println("Resetting Topic: " + topic);
    AdminZkClient adminZkClient = new AdminZkClient(zkClient);
    adminZkClient.deleteTopic(topic);

    // deletions are not instantaneous
    boolean success = false;
    int maxMs = 5_000;
    while (maxMs > 0 && !success) {
      try {
        maxMs -= 100;
        adminZkClient.createTopic(topic, 1, 1, new Properties(), null);
        success = true;
      } catch (TopicExistsException ignored) {
      }
    }

    if (!success) {
      Assert.fail("failed to create " + topic);
    }
  }

  private Map<String, List<PartitionInfo>> listTopics() {
    Properties props = new Properties();
    props.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
    props.put("group.id", "test-container-consumer-group");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Map<String, List<PartitionInfo>> topics = consumer.listTopics();
    consumer.close();

    return topics;
  }
Moloch answered 3/6, 2019 at 14:57 Comment(1)
You don't need Zookeeper. Use AdminClient or KafkaAdminClientIngold
Q
1

I have read almost all of the answers, we are using Kafka Kraft 3.4.0. so I can maybe add one answer for Kraft. It is not really different how to do this on Kraft, you will need a machine which can use the bootstrap servers of kafka with kafka binaries on it do:

bin/kafka-configs.sh --bootstrap-server :9092 --entity-type topics --entity-name your-topic --alter --add-config retention.ms=1000

The problem here is that log retention as time is not really the only thing kafka looks when deleting logs from the filesystem. You also need to consider the log segment bytes. Kafka rolls up segments when the log sizes on disk reach to segment.bytes for the partition in hand, if you have an open partition offset still haven't rolled up it is not going to be deleted even if you set the retention.ms to 1 milisecond.

If you are looking for a way to clear a topic with lets say messages are 2000 bytes each;

set segment.bytes:

bin/kafka-configs.sh --bootstrap-server :9092 --entity-type topics --entity-name your-topic --alter --add-config segment.bytes=<smaller than 1 message's total bytes>

set retention.ms:

bin/kafka-configs.sh --bootstrap-server :9092 --entity-type topics --entity-name your-topic --alter --add-config retention.ms=1000

And keep in mind it is NOT going to clear magically in 1 second, the delete retention period should be triggered in a second BUT the rollups of open ended segments will take more than that (close to 5 mins). So keep an eye on the log sizes on the brokers and reset these configs when you see log sizes are 0 for the topic:

/bin/kafka-configs.sh --bootstrap-server :9092 --entity-type topics --entity-name your-topic --delete-config segment.bytes
/bin/kafka-configs.sh --bootstrap-server :9092 --entity-type topics --entity-name your-topic --delete-config retention.ms

Quod answered 30/5, 2023 at 15:53 Comment(0)
E
0

if you are using confluentinc/cp-kafka containers here is the command to delete the topic.

docker exec -it <kafka-container-id> kafka-topics --zookeeper zookeeper:2181 --delete --topic <topic-name>

Success response:

Topic <topic-name> is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Eventuate answered 8/9, 2021 at 19:35 Comment(0)
P
0

I'm using Kafka 2.13 tools. Now --zookeeper is unrecognized option for kafka-topics.sh . To delete a topic:

bin/kafka-topics.sh --bootstrap-server [kafka broker]:9092 --delete --topic [topic name]

Just take into account that to create the same topic again you may need to way a while if you had a lot of data in the deleted topic. When you try to create the same topic, you may get the error:

ERROR org.apache.kafka.common.errors.TopicExistsException: Topic '[topic name]' is marked for deletion.

Predella answered 15/6, 2022 at 14:40 Comment(0)
U
-1

Just in case someone is looking for an updated answer (in 2022), I found the following will work for Kafka version 3.3.1. This will change the configuration for "your-topic" so that messages are retained for 1000ms. After messages are purged, then you can set back to a different value.

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name your-topic  --alter --add-config retention.ms=1000
Upas answered 25/11, 2022 at 21:20 Comment(1)
This same command was already providedIngold
M
-2

have you considered having your app simply use a new renamed topic? (i.e. a topic that is named like the original topic but with a "1" appended at the end).

That would also give your app a fresh clean topic.

Misapprehension answered 3/2, 2021 at 17:56 Comment(4)
But this leaves cruft behind for the Kafka admins to deal with and all other clients using the same topic then need to be updatedIngold
yes, producers and consumers would need to connect to the new topic. Normally the topic data will expire (based on your retention settings) and be cleaned out so I don't see that Kafka admins would need to do any work hereMisapprehension
1) It requires a code change for all clients. In enterprise setting, with several clients, that's not really feasible. 2) The cluster has a topic limit (albeit in the order of several thousand). Empty, abandoned topics should definitely be removed periodically. 3) creating a new topic doesn't really answer the questionIngold
@OneCricketer yes, it doesn't work in an scenario of a public kafka with many clients consuming, but it's definitely a useful strategy used in many enterprise settings where the kafka is private and the consumers of the topic are under the management of a single operations group. The OP was about a kafka topic living on a local machine and my solution definitely would work for that use case.Misapprehension

© 2022 - 2025 — McMap. All rights reserved.