The first N calls to poll return nothing when I register a consumer with a new group id.
I want to test that when I call a service, a Kafka event is published. The issue is that whenever I change the groupId, the first N polls return nothing. I understand that Kafka first registers the consumer when polling but I find the number of polls (time) required to register the consumer to be too random.
The consumer configuration:
Properties props = new Properties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_URL);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
KafkaConsumer<S, T> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
Steps:
- Before every test I call
consumer.poll(Duration.ofSeconds(5))
just to make sure that the consumer is registered and the offset set. - I call the service and assert on the response. If I check Kafka using the UI, the event is published.
- I call
consumer.poll(Duration.ofSeconds(5))
and hopefully receive some records. This is the step that fails.
Is there a way to make sure that the second poll always returns the record? I tried to make the first poll last for 1 minute (and I already think that 5 seconds is too much to wait for every test) and it would still some times work and some times not.
Thanks.