I'm using KafkaConsumer to consume messages from Kafka server (topics)..
- It works fine for topics created before starting Consumer code...
But the problem is, it will not work if the topics created dynamically(i mean to say after consumer code started), but the API says it will support dynamic topic creation.. Here is the link for your reference..
Kafka version used : 0.9.0.1
https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
Here is the JAVA code...
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Pattern r = Pattern.compile("siddu(\\d)*");
consumer.subscribe(r, new HandleRebalance());
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(partition.partition() + ": " +record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
NOTE: My topic names are matching the Regular Expression.. And if i restart the consumer then it will start reading messages pushed to topic...
Any help is really appreciated...