Is there a way to delete all the data from a topic or delete the topic before every run?
Asked Answered
R

17

115

Is there a way to delete all the data from a topic or delete the topic before every run?

Can I modify the KafkaConfig.scala file to change the logRetentionHours property? Is there a way the messages gets deleted as soon as the consumer reads it?

I am using producers to fetch the data from somewhere and sending the data to a particular topic where a consumer consumes, can I delete all the data from that topic on every run? I want only new data every time in the topic. Is there a way to reinitialize the topic somehow?

Roybn answered 18/7, 2013 at 18:8 Comment(2)
D
72

Don't think it is supported yet. Take a look at this JIRA issue "Add delete topic support".

To delete manually:

  1. Shutdown the cluster
  2. Clean kafka log dir (specified by the log.dir attribute in kafka config file ) as well the zookeeper data
  3. Restart the cluster

For any given topic what you can do is

  1. Stop kafka
  2. Clean kafka log specific to partition, kafka stores its log file in a format of "logDir/topic-partition" so for a topic named "MyTopic" the log for partition id 0 will be stored in /tmp/kafka-logs/MyTopic-0 where /tmp/kafka-logs is specified by the log.dir attribute
  3. Restart kafka

This is NOT a good and recommended approach but it should work. In the Kafka broker config file the log.retention.hours.per.topic attribute is used to define The number of hours to keep a log file before deleting it for some specific topic

Also, is there a way the messages gets deleted as soon as the consumer reads it?

From the Kafka Documentation :

The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space. Kafka's performance is effectively constant with respect to data size so retaining lots of data is not a problem.

In fact the only metadata retained on a per-consumer basis is the position of the consumer in in the log, called the "offset". This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads messages, but in fact the position is controlled by the consumer and it can consume messages in any order it likes. For example a consumer can reset to an older offset to reprocess.

For finding the start offset to read in Kafka 0.8 Simple Consumer example they say

Kafka includes two constants to help, kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data in the logs and starts streaming from there, kafka.api.OffsetRequest.LatestTime() will only stream new messages.

You can also find the example code there for managing the offset at your consumer end.

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
}
Detent answered 23/8, 2013 at 22:14 Comment(6)
I believe the correct link to the JIRA issue is issues.apache.org/jira/browse/KAFKA-330Dodecanese
The topic will still show up here because it's listed in zookeeper. You'll have to recursively delete everything under brokers/topics/<topic_to_delete> as well as the logs to get rid of it.Pteropod
According to the issue link, you can delete a topic after version 0.8.1. You can view detail help by kafka-run-class.sh kafka.admin.DeleteTopicCommand.Mazarin
Update: as of kafka 0.8.2 the command is changed to: kafka-run-class.sh kafka.admin.TopicCommand --delete --topic [topic_to_delete] --zookeeper localhost:2181Reposition
I think this enable topic deletion functionality has now been added. Probably the next stable release will have it.Hideaway
almost all the solutions say along with kafka-log deletion delete as well the zookeeper data. where will be the location for this?Gasper
F
85

As I mentioned here Purge Kafka Queue:

Tested in Kafka 0.8.2, for the quick-start example: First, Add one line to server.properties file under config folder:

delete.topic.enable=true

then, you can run this command:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
Fuchsin answered 14/6, 2015 at 20:6 Comment(1)
Btw, you don't need to restart the Kafka server after adding the option, in case anyone is wondering.Sic
D
72

Don't think it is supported yet. Take a look at this JIRA issue "Add delete topic support".

To delete manually:

  1. Shutdown the cluster
  2. Clean kafka log dir (specified by the log.dir attribute in kafka config file ) as well the zookeeper data
  3. Restart the cluster

For any given topic what you can do is

  1. Stop kafka
  2. Clean kafka log specific to partition, kafka stores its log file in a format of "logDir/topic-partition" so for a topic named "MyTopic" the log for partition id 0 will be stored in /tmp/kafka-logs/MyTopic-0 where /tmp/kafka-logs is specified by the log.dir attribute
  3. Restart kafka

This is NOT a good and recommended approach but it should work. In the Kafka broker config file the log.retention.hours.per.topic attribute is used to define The number of hours to keep a log file before deleting it for some specific topic

Also, is there a way the messages gets deleted as soon as the consumer reads it?

From the Kafka Documentation :

The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space. Kafka's performance is effectively constant with respect to data size so retaining lots of data is not a problem.

In fact the only metadata retained on a per-consumer basis is the position of the consumer in in the log, called the "offset". This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads messages, but in fact the position is controlled by the consumer and it can consume messages in any order it likes. For example a consumer can reset to an older offset to reprocess.

For finding the start offset to read in Kafka 0.8 Simple Consumer example they say

Kafka includes two constants to help, kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data in the logs and starts streaming from there, kafka.api.OffsetRequest.LatestTime() will only stream new messages.

You can also find the example code there for managing the offset at your consumer end.

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
}
Detent answered 23/8, 2013 at 22:14 Comment(6)
I believe the correct link to the JIRA issue is issues.apache.org/jira/browse/KAFKA-330Dodecanese
The topic will still show up here because it's listed in zookeeper. You'll have to recursively delete everything under brokers/topics/<topic_to_delete> as well as the logs to get rid of it.Pteropod
According to the issue link, you can delete a topic after version 0.8.1. You can view detail help by kafka-run-class.sh kafka.admin.DeleteTopicCommand.Mazarin
Update: as of kafka 0.8.2 the command is changed to: kafka-run-class.sh kafka.admin.TopicCommand --delete --topic [topic_to_delete] --zookeeper localhost:2181Reposition
I think this enable topic deletion functionality has now been added. Probably the next stable release will have it.Hideaway
almost all the solutions say along with kafka-log deletion delete as well the zookeeper data. where will be the location for this?Gasper
E
18

Tested with kafka 0.10

1. stop zookeeper & Kafka server,
2. then go to 'kafka-logs' folder , there you will see list of kafka topic folders, delete folder with topic name
3. go to 'zookeeper-data' folder , delete data inside that.
4. start zookeeper & kafka server again.

Note : if you are deleting topic folder/s inside kafka-logs but not from zookeeper-data folder, then you will see topics are still there.

Escort answered 15/9, 2016 at 11:59 Comment(0)
P
10

Below are scripts for emptying and deleting a Kafka topic assuming localhost as the zookeeper server and Kafka_Home is set to the install directory:

The script below will empty a topic by setting its retention time to 1 second and then removing the configuration:

#!/bin/bash
echo "Enter name of topic to empty:"
read topicName
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --add-config retention.ms=1000
sleep 5
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --delete-config retention.ms

To fully delete topics you must stop any applicable kafka broker(s) and remove it's directory(s) from the kafka log dir (default: /tmp/kafka-logs) and then run this script to remove the topic from zookeeper. To verify it's been deleted from zookeeper the output of ls /brokers/topics should no longer include the topic:

#!/bin/bash
echo "Enter name of topic to delete from zookeeper:"
read topicName
/$Kafka_Home/bin/zookeeper-shell localhost:2181 <<EOF
rmr /brokers/topics/$topicName
ls /brokers/topics
quit
EOF
Polystyrene answered 6/12, 2016 at 16:32 Comment(2)
This will only work if the retention check happens within those 5 seconds of sleeping. Please make sure you sleep until the check has definitely passed as specified here: grep "log.retention.check.interval" $Kafka_Home/config/server.propertiesThirtytwomo
I wanted to edit the answer as there is a small mistake in the first command. But one character edits are not allowed. Actually it is not --add config rather it is --add-configCrash
B
9

As a dirty workaround, you can adjust per-topic runtime retention settings, e.g. bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1 (retention.bytes=0 might also work)

After a short while kafka should free the space. Not sure if this has any implications compared to re-creating the topic.

ps. Better bring retention settings back, once kafka done with cleaning.

You can also use retention.ms to persist historical data

Bors answered 6/8, 2014 at 17:10 Comment(0)
T
7

We tried pretty much what the other answers are describing with moderate level of success. What really worked for us (Apache Kafka 0.8.1) is the class command

sh kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic yourtopic --zookeeper localhost:2181

Tympanitis answered 20/11, 2014 at 19:7 Comment(3)
Tried this in 0.8.1. The command returns "deletion succeeded!" however it doesn't delete the partitions inside the log folders.Clothespress
Tried on 0.8.2.1 (homebrew) and it's giving this error. Error: Could not find or load main class kafka.admin.DeleteTopicCommandAvera
As of new kafka (0.8.2), it is sh kafka-run-class.sh kafka.admin.TopicCommand --delete --topic [topic_for_delete] --zookeeper localhost:2181 . Make sure delete.topic.enable is true.Paramorphism
C
5

For brew users

If you're using brew like me and wasted a lot of time searching for the infamous kafka-logs folder, fear no more. (and please do let me know if that works for you and multiple different versions of Homebrew, Kafka etc :) )

You're probably going to find it under:

Location:

/usr/local/var/lib/kafka-logs


How to actually find that path

(this is also helpful for basically every app you install through brew)

1) brew services list

kafka started matbhz /Users/matbhz/Library/LaunchAgents/homebrew.mxcl.kafka.plist

2) Open and read that plist you found above

3) Find the line defining server.properties location open it, in my case:

  • /usr/local/etc/kafka/server.properties

4) Look for the log.dirs line:

log.dirs=/usr/local/var/lib/kafka-logs

5) Go to that location and delete the logs for the topics you wish

6) Restart Kafka with brew services restart kafka

Cismontane answered 27/9, 2017 at 0:39 Comment(0)
G
3

All data about topics and its partitions are stored in tmp/kafka-logs/. Moreover they are stored in a format topic-partionNumber, so if you want to delete a topic newTopic, you can:

  • stop kafka
  • delete the files rm -rf /tmp/kafka-logs/newTopic-*
Girvin answered 26/1, 2015 at 6:27 Comment(0)
P
3

As of kafka 2.3.0 version, there is an alternate way to soft deletion of Kafka (old approach are deprecated ).

Update retention.ms to 1 sec (1000ms) then set it again after a min, to default setting i.e 7 days (168 hours, 604,800,000 in ms )

Soft deletion:- (rentention.ms=1000) (using kafka-configs.sh)

bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=1000
Completed Updating config for entity: topic 'kafka_topic3p3r'.

Setting to default:- 7 days (168 hours , retention.ms= 604800000)

bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=604800000
Pinko answered 8/9, 2019 at 17:22 Comment(0)
H
3

Simplest way without restarting servers(I am using this with AWS MSK seamlessly):

cd kafka_2.12-2.6.2/bin

Topic Deletion:

  1. Please replace $topic_name:
./kafka-topics.sh \
    --bootstrap-server $kafka_bootstrap_servers \
    --command-config client.properties \
    --delete \
    --topic $topic_name

Here is the client.properties file:

kafka_2.12-2.6.2/bin/client.properties

ssl.truststore.location=/usr/lib/jvm/java-11-openjdk-amd64/lib/security/cacerts
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
max.request.size=104857600

Topic Data Deletion:

  1. Option A:
./kafka-delete-records.sh \
    --bootstrap-server $kafka_bootstrap_servers \
    --command-config client.properties \
    --offset-json-file ./delete-records.json

This is most clean way to delete the data immediately rather than waiting for Kafka to do this as a background job. But there is one time extra effort on specifiying all the partitions for a particular topic in the delete JSON file.

Here is the delete-records.json content is:

{
  "partitions": [
    {
      "topic": $topic_name,
      "partition": 0,
      "offset": -1
    },
    {
      "topic": $topic_name,
      "partition": 1,
      "offset": -1
    },
    {
      "topic": $topic_name,
      "partition": 2,
      "offset": -1
    }
  ],
  "version": 1
}
  1. Option B:

Step1:

./kafka-configs.sh \
    --bootstrap-server $kafka_bootstrap_servers \
    --command-config client.properties
    --alter \
    --entity-type topics \
    --add-config retention.ms=1 \
    --entity-name $topic_name

Now, wait for couple of minutes to let Kafka delete the data from topic and now come back and revert to default 7 days data retention.

Step2:

./kafka-configs.sh \
    --bootstrap-server $kafka_bootstrap_servers \
    --command-config client.properties
    --alter \
    --entity-type topics \
    --add-config retention.ms=604800000 \
    --entity-name $topic_name
Homomorphism answered 24/7, 2022 at 14:41 Comment(0)
I
1
  1. Stop ZooKeeper and Kafka
  2. In server.properties, change log.retention.hours value. You can comment log.retention.hours and add log.retention.ms=1000. It would keep the record on Kafka Topic for only one second.
  3. Start zookeeper and kafka.
  4. Check on consumer console. When I opened the console for the first time, record was there. But when I opened the console again, the record was removed.
  5. Later on, you can set the value of log.retention.hours to your desired figure.
Invariant answered 20/4, 2017 at 10:57 Comment(0)
O
1

I use the utility below to cleanup after my integration test run.

It uses the latest AdminZkClient api. The older api has been deprecated.

import javax.inject.Inject
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.utils.Time

class ZookeeperUtils @Inject() (config: AppConfig) {

  val testTopic = "users_1"

  val zkHost = config.KafkaConfig.zkHost
  val sessionTimeoutMs = 10 * 1000
  val connectionTimeoutMs = 60 * 1000
  val isSecure = false
  val maxInFlightRequests = 10
  val time: Time = Time.SYSTEM

  def cleanupTopic(config: AppConfig) = {

    val zkClient = KafkaZkClient.apply(zkHost, isSecure, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time)
    val zkUtils = new AdminZkClient(zkClient)

    val pp = new Properties()
    pp.setProperty("delete.retention.ms", "10")
    pp.setProperty("file.delete.delay.ms", "1000")
    zkUtils.changeTopicConfig(testTopic , pp)
    //    zkUtils.deleteTopic(testTopic)

    println("Waiting for topic to be purged. Then reset to retain records for the run")
    Thread.sleep(60000L)

    val resetProps = new Properties()
    resetProps.setProperty("delete.retention.ms", "3000000")
    resetProps.setProperty("file.delete.delay.ms", "4000000")
    zkUtils.changeTopicConfig(testTopic , resetProps)

  }


}

There is an option delete topic. But, it marks the topic for deletion. Zookeeper later deletes the topic. Since this can be unpredictably long, I prefer the retention.ms approach

Overblown answered 1/10, 2019 at 10:36 Comment(0)
K
1

do:

cd /path/to/kafkaInstallation/kafka-server
bin/kafka-topics.sh  --bootstrap-server  localhost:9092 --delete --topic name_of_kafka_topic

then you can recreate it using:

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic name_of_kafka_topic
Kenway answered 21/6, 2022 at 10:23 Comment(0)
T
0

In manually deleting a topic from a kafka cluster , you just might check this out https://github.com/darrenfu/bigdata/issues/6 A vital step missed a lot in most solution is in deleting the /config/topics/<topic_name> in ZK.

Teston answered 16/6, 2017 at 10:50 Comment(0)
C
0

I use this script:

#!/bin/bash
topics=`kafka-topics --list --zookeeper zookeeper:2181`
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --config ${p}=100
    done
done
sleep 60
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --delete-config ${p}
    done
done
Cockleboat answered 7/6, 2019 at 13:6 Comment(0)
P
0

There are two solutions to clean up topics data

  1. Change the zookeeper dataDir path "dataDir=/dataPath" to some other value, delete kafka logs folder and restart zookeeper and kafka server

  2. Run zkCleanup.sh from zookeeper server

Poorly answered 4/10, 2021 at 5:49 Comment(0)
G
0

You can utilize the provided command to remove all messages from the specified topic.

kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic <topic_name>

This action will preserve the topic along with its respective configurations while effectively purging all messages.

Glarum answered 20/1 at 15:14 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.