Is it possible to consume kafka messages using key and partition?
Asked Answered
V

2

4

I am using kafka_2.12 version 2.3.0 where I am publishing data into kafka topic using partition and key. I need to find a way using which I can consume a particular message from topic using key and partition combination. That way I won't have to consume all the messages and iterate for the correct one.

Right now I am only able to do this

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props)
consumer.subscribe(Collections.singletonList("topic"))
ConsumerRecords<String, String> records = consumer.poll(100)
def data = records.findAll {
    it -> it.key().equals(key)
}
Vocative answered 6/11, 2019 at 6:57 Comment(0)
V
1

There are two ways to consume topic/partitions is:

  1. KafkaConsumer.assign() : Document link
  2. KafkaConsumer.subscribe() : Document link

So, You can't get messages by key.

If you don't have a plan to expand partitions, consider using assign() method. Because all the messages that come with the specific key will go to the same partition.

How to use:

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
TopicPartition partition = new TopicPartition("some-topic", 0);
consumer.assign(Arrays.asList(partition));

while(true){
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    String data = records.findAll {
        it -> it.key().equals(key)
    }
}
Vermont answered 6/11, 2019 at 7:15 Comment(2)
Is it possible to look for a particular partition in a topic?Vocative
@AbhishekGharai Yes, Assign() method could manually assign a list of partitions to this consumer.Vermont
D
3

You can't "get messages by key from Kafka".

One solution, if practical, would be to have as many partitions as keys and always route messages for a key to the same partition.

Message Key as Partition

kafkaConsumer.assign(topicPartitions);
    kafkaConsumer.seekToBeginning(topicPartitions);

    // Pull records from kafka, keep polling until we get nothing back
    final List<ConsumerRecord<byte[], byte[]>> allRecords = new ArrayList<>();
    ConsumerRecords<byte[], byte[]> records;
    do {
        // Grab records from kafka
        records = kafkaConsumer.poll(2000L);
        logger.info("Found {} records in kafka", records.count());

        // Add to our array list
        records.forEach(allRecords::add);

    }
    while (!records.isEmpty());

Access messages of a Topic using Topic Name only

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList(<Topic Name>,<Topic Name>));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }
Dysfunction answered 6/11, 2019 at 7:7 Comment(2)
Is it possible to look for a particular partition in a topic?Vocative
programcreek.com/java-api-examples/…Dysfunction
V
1

There are two ways to consume topic/partitions is:

  1. KafkaConsumer.assign() : Document link
  2. KafkaConsumer.subscribe() : Document link

So, You can't get messages by key.

If you don't have a plan to expand partitions, consider using assign() method. Because all the messages that come with the specific key will go to the same partition.

How to use:

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
TopicPartition partition = new TopicPartition("some-topic", 0);
consumer.assign(Arrays.asList(partition));

while(true){
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    String data = records.findAll {
        it -> it.key().equals(key)
    }
}
Vermont answered 6/11, 2019 at 7:15 Comment(2)
Is it possible to look for a particular partition in a topic?Vocative
@AbhishekGharai Yes, Assign() method could manually assign a list of partitions to this consumer.Vermont

© 2022 - 2025 — McMap. All rights reserved.