How to check if Kafka Consumer is ready
Asked Answered
B

8

12

I have Kafka commit policy set to latest and missing first few messages. If I give a sleep of 20 seconds before starting to send the messages to the input topic, everything is working as desired. I am not sure if the problem is with consumer taking long time for partition rebalancing. Is there a way to know if the consumer is ready before starting to poll ?

Breath answered 3/1, 2018 at 6:3 Comment(0)
D
3
  • You can use consumer.assignment(), it will return set of partitions and verify whether all of the partitions are assigned which are available for that topic.

  • If you are using spring-kafka project, you can include spring-kafka-test dependancy and use below method to wait for topic assignment , but you need to have container. ContainerTestUtils.waitForAssignment(Object container, int partitions);

Diazomethane answered 9/1, 2018 at 23:7 Comment(2)
I am using the @KafkaListener annotation on method level to define the consumer. How do I get the container in test? Attempted autowiring MessageListenerContainer and it complains that there is no qualifying bean.Puberulent
@Puberulent Inject KafkaListenerEndpointRegistry registry into your test and call registry.getListenerContainer(listenerId) to get a corresponding MessageListenerContainer which can be used with waitForAssignment().Godfry
L
2

You can do the following:

I have a test that reads data from kafka topic.
So you can't use KafkaConsumer in multithread environment, but you can pass parameter "AtomicReference assignment", update it in consumer-thread, and read it in another thread.

For example, snipped of working code in project for testing:

    private void readAvro(String readFromKafka,
                      AtomicBoolean needStop,
                      List<Event> events,
                      String bootstrapServers,
                      int readTimeout) {
    // print the topic name
    AtomicReference<Set<TopicPartition>> assignment = new AtomicReference<>();
    new Thread(() -> readAvro(bootstrapServers, readFromKafka, needStop, events, readTimeout, assignment)).start();

    long startTime = System.currentTimeMillis();
    long maxWaitingTime = 30_000;
    for (long time = System.currentTimeMillis(); System.currentTimeMillis() - time < maxWaitingTime;) {
        Set<TopicPartition> assignments = Optional.ofNullable(assignment.get()).orElse(new HashSet<>());
        System.out.println("[!kafka-consumer!] Assignments [" + assignments.size() + "]: "
                + assignments.stream().map(v -> String.valueOf(v.partition())).collect(Collectors.joining(",")));
        if (assignments.size() > 0) {
            break;
        }
        try {
            Thread.sleep(1_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            needStop.set(true);
            break;
        }
    }
    System.out.println("Subscribed! Wait summary: " + (System.currentTimeMillis() - startTime));
}

private void readAvro(String bootstrapServers,
                      String readFromKafka,
                      AtomicBoolean needStop,
                      List<Event> events,
                      int readTimeout,
                      AtomicReference<Set<TopicPartition>> assignment) {

    KafkaConsumer<String, byte[]> consumer = (KafkaConsumer<String, byte[]>) queueKafkaConsumer(bootstrapServers, "latest");
    System.out.println("Subscribed to topic: " + readFromKafka);
    consumer.subscribe(Collections.singletonList(readFromKafka));

    long started = System.currentTimeMillis();
    while (!needStop.get()) {
        assignment.set(consumer.assignment());
        ConsumerRecords<String, byte[]> records = consumer.poll(1_000);
        events.addAll(CommonUtils4Tst.readEvents(records));

        if (readTimeout == -1) {
            if (events.size() > 0) {
                break;
            }
        } else if (System.currentTimeMillis() - started > readTimeout) {
            break;
        }
    }

    needStop.set(true);

    synchronized (MainTest.class) {
        MainTest.class.notifyAll();
    }
    consumer.close();
}

P.S.
needStop - global flag, to stop all running thread if any in case of failure of success
events - list of object, that i want to check
readTimeout - how much time we will wait until read all data, if readTimeout == -1, then stop when we read anything

Lafond answered 8/8, 2018 at 15:17 Comment(0)
D
1

Thanks to Alexey (I have also voted up), I seemed to have resolved my issue essentially following the same idea.

Just want to share my experience... in our case we using Kafka in request & response way, somewhat like RPC. Request is being sent on one topic and then waiting for response on another topic. Running into a similar issue i.e. missing out first response.

I have tried ... KafkaConsumer.assignment(); repeatedly (with Thread.sleep(100);) but doesn't seem to help. Adding a KafkaConsumer.poll(50); seems to have primed the consumer (group) and receiving the first response too. Tested few times and it consistently working now.

BTW, testing requires stopping application & deleting Kafka topics and, for a good measure, restarted Kafka too.

PS: Just calling poll(50); without assignment(); fetching logic, like Alexey mentioned, may not guarantee that consumer (group) is ready.

Dispend answered 11/8, 2018 at 16:1 Comment(0)
C
1

You can modify an AlwaysSeekToEndListener (listens only to new messages) to include a callback:

public class AlwaysSeekToEndListener<K, V> implements ConsumerRebalanceListener {
    private final Consumer<K, V> consumer;
    private Runnable callback;

    public AlwaysSeekToEndListener(Consumer<K, V> consumer) {
        this.consumer = consumer;
    }

    public AlwaysSeekToEndListener(Consumer<K, V> consumer, Runnable callback) {
        this.consumer = consumer;
        this.callback = callback;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        consumer.seekToEnd(partitions);
        if (callback != null) {
            callback.run();
        }
    }
}

and subscribe with a latch callback:

CountDownLatch initLatch = new CountDownLatch(1);

consumer.subscribe(singletonList(topic), new AlwaysSeekToEndListener<>(consumer, () -> initLatch.countDown()));

initLatch.await(); // blocks until consumer is ready and listening

then proceed to start your producer.

Cremator answered 30/6, 2021 at 7:21 Comment(0)
H
0

If your policy is set to latest - which takes effect if there are no previously committed offsets - but you have no previously committed offsets, then you should not worry about 'missing' messages, because you're telling Kafka not to care about messages that were sent 'previously' to your consumers being ready.

If you care about 'previous' messages, you should set the policy to earliest.

In any case, whatever the policy, the behaviour you are seeing is transient, i.e. once committed offsets are saved in Kafka, on every restart the consumers will pick up where they left previoulsy

Hegel answered 3/1, 2018 at 10:6 Comment(2)
My requirement is in such a way that I need to send records read them and do some processing. Messages once read should not be reprocessed. And I need not read all the messages from by setting to earliest as that does not make much sense in my case.Breath
I'm afraid there's a mismatch between your expectations and Kafka semantics. With auto.offset.reset set to latest you should not expect to get all messages sent to a topic. Consumers and producers work completely asynchronously from each other. If you want everything that has been sent to a topic set the reset to earliest. In any case once the consumer group is established and has committed offset, the reset policy will be irrelevantHegel
S
0

I needed to know if a kafka consumer was ready before doing some testing, so i tried with consumer.assignment(), but it only returned the set of partitions assigned, but there was a problem, with this i cannot see if this partitions assigned to the group had offset setted, so later when i tried to use the consumer it didn´t have offset setted correctly.

The solutions was to use committed(), this will give you the last commited offsets of the given partitions that you put in the arguments.

So you can do something like: consumer.committed(consumer.assignment())

If there is no partitions assigned yet it will return:

{}

If there is partitions assigned, but no offset yet:

{name.of.topic-0=null, name.of.topic-1=null}

But if there is partitions and offset:

{name.of.topic-0=OffsetAndMetadata{offset=5197881, leaderEpoch=null, metadata=''}, name.of.topic-1=OffsetAndMetadata{offset=5198832, leaderEpoch=null, metadata=''}}

With this information you can use something like:

consumer.committed(consumer.assignment()).isEmpty();
consumer.committed(consumer.assignment()).containsValue(null);

And with this information you can be sure that the kafka consumer is ready.

Syriac answered 4/3, 2022 at 20:33 Comment(0)
B
0

I've faced with similar problem during testing with EmbeddedKafka.

Disclaimer. My approach may not looking like "kafka-way" yet it keeps job done with respect some tradeoffs. And of course it shouldn't be used anywhere but tests.

In general the test is consists of next steps:

  1. Create consumer
  2. Post some message to topic
  3. Expect the only specific message was consumed

So I'm looking for auto.offset.reset=latest semantic with guarantees that the assigned topic is ready to be polled. At the end I decided to use special message to mark that consumer is ready:

public class ConsumerHelper {
    
    public static KafkaConsumer<String, Object> buildConsumer(EmbeddedKafkaBroker broker, Set<String> topics) {
        var consumer = buildConsumer(broker);
        if (!CollectionUtils.isEmpty(topics)) {
            var producer = buildUtilProducer(...);
            var key = "util-message-key" + UUID.randomUUID(); //key must be unique for every method call
            topics.forEach(
                    topic -> producer.send(new ProducerRecord<>(topic, key, new Object()))
            );
            var uncheckedTopics = new HashSet<>(topics);
            consumer.subscribe(topics);
            do {
                consumer.poll(Duration.ofMillis()).forEach(record -> {
                    if (key.equals(record.getKey())) {
                        uncheckedTopics.remove(record.topic())
                    }
                });
                consumer.commitSync()
            } while (!uncheckedTopics.isEmpty() /* you may add some timeout check logic here if needed */)
        }
        return consumer;

    }


    /**
     * consumer builder method, e.g. with KafkaTestUtils
     *
     * @implSpec consumer group id must be unique, {@code auto.offset.reset} must be setted to {@code earliest}
     */
    private static KafkaConsumer<String, Object> buildConsumer(EmbeddedKafkaBroker broker) {
        var randomGroupId = "group-id-" + UUID.randomUUID(); //consumer group id must be unique
        var consumerProps = KafkaTestUtils.consumerProps(randomGroupId, "true", broker);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //this is important
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserilizer.class);
        //some extra consumer props if needed
        //...
        //
        return new KafkaConsumer<>(consumerProps);

    }

    /**
     * util producer builder method, e.g. with KafkaTestUtils
     */
    private static KafkaConsumer<String, Object> buildUtilProducer() {
        //...
    }

}

After all the "KafkaConsumer" built with the public method is ready to consume new messages immediately.

Obvious restriction: tests should not be run concurrently.

Byssinosis answered 12/5, 2023 at 20:27 Comment(0)
A
0

Wanted to provide an answer using the Confluent Kafka python library and influenced from this github issue. Starting with the basic consumer example on github, the subscribe method is an aysnc process that will connect to topic partitions in the background. If you published to Kafka during this time, those messages will not be guaranteed to be consumed.

from confluent_kafka import Consumer

c = Consumer({
    ...
    'auto.offset.reset': 'latest'
})
# async process
c.subscribe(['mytopic'])
# if you publish right now, it's not guaranteed it will be consumed!
while True:
    msg = c.poll(1.0)

What others have mentioned is that we need to determine when the consumer has been assigned it's topics. One way to do this is to use the subscribe on_assign callback documented here. Once this callback has been triggered, we know the consumer has been assigned topic partitions. One thing that is quirky, is that you need to poll this whole time and wait for the polls to finish, even though the consumer has not yet been assigned. This is because polling is what triggers the on_assign callback in the first place.

from confluent_kafka import Consumer

assigned = False

def on_assign(consumer, partitions):
    assigned = True

c = Consumer({
    ...
    'auto.offset.reset': 'latest'
})
c.subscribe(['mytopic'], on_assign=on_assign)
while not assigned:
    msg = c.poll(1.0)
# at this point we know we are assigned! 
# can start normal poll / publish to Kafka
while running:
    msg = c.poll(1.0)
Aeolis answered 2/8, 2024 at 17:10 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.