kafka consumer to dynamically detect topics added
Asked Answered
T

4

9

I'm using KafkaConsumer to consume messages from Kafka server (topics)..

  • It works fine for topics created before starting Consumer code...

But the problem is, it will not work if the topics created dynamically(i mean to say after consumer code started), but the API says it will support dynamic topic creation.. Here is the link for your reference..

Kafka version used : 0.9.0.1

https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Here is the JAVA code...

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Pattern r = Pattern.compile("siddu(\\d)*");

    consumer.subscribe(r, new HandleRebalance());
    try {
         while(true) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(partition.partition()  + ": "  +record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }

NOTE: My topic names are matching the Regular Expression.. And if i restart the consumer then it will start reading messages pushed to topic...

Any help is really appreciated...

Telium answered 22/3, 2016 at 11:56 Comment(0)
B
19

There was an answer to this in the apache kafka mail archives. I am copying it below:

The consumer supports a configuration option "metadata.max.age.ms" which basically controls how often topic metadata is fetched. By default, this is set fairly high (5 minutes), which means it will take up to 5 minutes to discover new topics matching your regular expression. You can set this lower to discover topics quicker.

So in your props you can:

props.put("metadata.max.age.ms", 5000);

This will cause your consumer to find out about new topics every 5 seconds.

Balls answered 25/8, 2016 at 17:7 Comment(3)
it also depends on how do you set "auto.offset.reset" consumer property. if it is "latest", they it will pick latest/[not consumed before] messages from known topics (after consumer start), but not they dynamic topics. if you set it to "earliest" and also put consumer.seekToBeginning(consumer.assignment()); before poll - do it only once, then it will recognize dynamic/new topics, but it will also get all records from the beginning every timeOsteophyte
can we force metadata fetch request somehow? e.g. consumer.fetchMeta() or something?Severen
Got this to work in Spring-Boot with application.properties: spring.kafka.consumer.properties[metadata.max.age.ms] = 5000Florist
C
4

You can hook into Zookeeper. Check out the sample code. In essence, you will create a watcher on the Zookeeper node /brokers/topics. When new children are added here, it's a new Topic being added, and your watcher will get triggered.

Note that the difference between this and the other answer is that this one is a trigger where the other is a polling -- this one will be as close to real-time as possible, the other will be within whatever your polling interval is at best.

Coming answered 22/3, 2016 at 12:31 Comment(4)
Thanks for ur reply and help...basically i wanted to use KafkaConsumer api to achieve this and i solved it myself..Telium
@madlad see my answer below.Balls
'the sample code' link is invalid, also the question was about consuming messages, and not just knowing about new topic.... new topic will be available in consumer.listTopics().keySet() anywayOsteophyte
Link fixed -- also added a line about the difference between the two approaches.Coming
R
2

Here is the solution it worked for me by using KafkaConsumer api. Here is the Java code for it.

private static Consumer<Long, String> createConsumer(String topic) {
    final Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG,
            "KafkaExampleConsumer");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    // Create the consumer using props.
    final Consumer<Long, String> consumer =
            new KafkaConsumer<>(props);
    // Subscribe to the topic.
    consumer.subscribe(Collections.singletonList(topic));
    return consumer;
}

public static void runConsumer(String topic) throws InterruptedException {
    final Consumer<Long, String> consumer = createConsumer(topic);

    ConsumerRecords<Long, String> records = consumer.poll(100);
    for (ConsumerRecord<Long, String> record : records)
        System.out.printf("hiiiii offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    consumer.commitAsync();
    consumer.close();
    //System.out.println("DONE");
}

using this we can consume the message from dynamically created topics.

Rentfree answered 20/6, 2019 at 13:10 Comment(0)
M
0

use the subscribe method in KafkaConsumer class which takes a pattern as argument for the list of topics to get data from

/**

  • Subscribe to all topics matching specified pattern to get dynamically assigned partitions. * The pattern matching will be done periodically against all topics existing at the time of check. * This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering * the max metadata age, the consumer will refresh metadata more often and check for matching topics. *

    * See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the * use of the {@link ConsumerRebalanceListener}. Generally rebalances are triggered when there * is a change to the topics matching the provided pattern and when consumer group membership changes. * Group rebalances only take place during an active call to {@link #poll(Duration)}. * * @param pattern Pattern to subscribe to * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the *
    subscribed topics * @throws IllegalArgumentException If pattern or listener is null * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called * previously (without a subsequent call to {@link #unsubscribe()}), or if not * configured at-least one partition assignment strategy */ @Override public void

subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
Madelene answered 13/7, 2022 at 19:59 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.