Kafka consumer.poll returns no records
Asked Answered
H

1

8

The first N calls to poll return nothing when I register a consumer with a new group id.

I want to test that when I call a service, a Kafka event is published. The issue is that whenever I change the groupId, the first N polls return nothing. I understand that Kafka first registers the consumer when polling but I find the number of polls (time) required to register the consumer to be too random.

The consumer configuration:

Properties props = new Properties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_URL);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

KafkaConsumer<S, T> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));

Steps:

  1. Before every test I call consumer.poll(Duration.ofSeconds(5)) just to make sure that the consumer is registered and the offset set.
  2. I call the service and assert on the response. If I check Kafka using the UI, the event is published.
  3. I call consumer.poll(Duration.ofSeconds(5)) and hopefully receive some records. This is the step that fails.

Is there a way to make sure that the second poll always returns the record? I tried to make the first poll last for 1 minute (and I already think that 5 seconds is too much to wait for every test) and it would still some times work and some times not.

Thanks.

Harmsworth answered 2/5, 2019 at 9:56 Comment(3)
You should indicate which version of kafka you are usingAgan
Right. I'm using version 2.1.1Harmsworth
By default you are reading from the latest offset so if no one is producing AFTER your consumer registered no data will be fetch. Also to keep in mind that when a new consumer with different groupId joins it will cause a rebalance unless the partitions are already properly configured, that's mean extra waiting time or even errors coming to the consumerNewfeld
A
15

The reason it's not working with your "new groupId", is that you are in "latest" mode.

Default value is "latest", you need either to be in "earliest" mode or to poll a first time with your "new groupId" or commit offset for this "new groupId" for this topic.

You need to register the "groupId" to the topic, not the consumer.

Agan answered 2/5, 2019 at 10:8 Comment(6)
I do an initial poll (step 1) before every test and I don't change the group id after that. I'm not interested in what the first poll returns. It's there to register the group. I'll give it a try with "earliest".Harmsworth
In step 3 you said : "using a new groupId", but if you don't change your groupId, then it's because you do not commit the offset between the "poll", and the first poll doesn't "count" for kafkaAgan
Thanks. I disabled auto.commit, added consumer.commitSync() after the first poll and it seems to work. I still don't like that if I set the first poll duration to 1 second (it works with 5 secs), the seconds poll returns nothing. What guarantee can I have that 5 seconds will always be enough for the first poll and tests won't randomly fail depending on the environment?Harmsworth
You're welcome. You can't be "sure" with a timeout... you need to use another approach, like "try polling until you poll at least one record". There is a lib that i used to do so, i don't remember it right knowAgan
I think you'll find everything you need here : blog.mimacom.com/testing-apache-kafka-with-spring-bootAgan
@Gremi64, can you pls check this out? #78382428Affectionate

© 2022 - 2025 — McMap. All rights reserved.