EmbeddedKafka w/ ContainerTestUtils.waitForAssignment throws: Expected 1 but got 0 partitions
Asked Answered
A

1

13

We have an integration test where we use EmbeddedKafka and produce a message to a topic, our app processes that message, and the result is sent to a second topic where we consume and assert the output. In CI this works maybe 2/3 of the time, but we will hit cases where KafkaTestUtils.getSingleRecord throws java.lang.IllegalStateException: No records found for topic (See [1] below).

To try and resolve this, I added ContainerTestUtils.waitForAssignment for each listener container in the registry (See [2] below). After a few successful runs in CI, I saw a new exception: java.lang.IllegalStateException: Expected 1 but got 0 partitions. This now has me wondering if this was actually the root cause of the original exception of no records found.

Any ideas what could help with the random failures here? I would appreciate any suggestions on how to troubleshoot.

spring-kafka and spring-kafka-test v2.6.4.

Edit: Added newConsumer for reference.

Example of our setup:

@SpringBootTest
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(
    topics = { "topic1","topic2" },
    partitions = 1,
    brokerProperties = {"listeners=PLAINTEXT://localhost:9099", "port=9099"})
public class IntegrationTest {

  @Autowired
  private EmbeddedKafkaBroker embeddedKafkaBroker;

  @Autowired
  private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

  @Test
  public void testExample() {
    try (Consumer<String, String> consumer = newConsumer()) {
      for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
        [2]
        ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafkaBroker.getPartitionsPerTopic());
      }

      try (Producer<String, String> producer = newProducer()) {
        embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "topic2"); // [1]

        producer.send(new ProducerRecord<>(
            "topic1",
            "test payload"));
        producer.flush();
      }

      String result = KafkaTestUtils.getSingleRecord(consumer, "topic2").value();
      assertEquals(result, "expected result");
    }
  }

  private Consumer<String, String> newConsumer() {
    Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("groupId", "false", embeddedKafkaBroker);
    ConsumerFactory<String, AssetTransferResponse> consumerFactory = new DefaultKafkaConsumerFactory<>(
        consumerProps,
        new StringDeserializer(),
        new CustomDeserializer<>());
    return consumerFactory.createConsumer();
  }
}
Aikens answered 18/3, 2021 at 20:12 Comment(6)
You need to show newConsumer() - for the first problem, have you set ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")? Why are you waiting for a container to get an assignment when you plan to receive the record from your Consumer, not the container's consumer.Propertied
@GaryRussell Added newConsumer new consumer for reference. We didn't set "earliest" here as we're trying to mimic our apps behavior, but we might end up just doing that. As far as waiting for the assignment, honestly was just looking at other threads as means of trying to programmatically stall for things to be "ready".Aikens
That generally is the cause of these random test failures because the record can be stored before the consumer starts and by default the starting offset is set to “latest”.Propertied
@GaryRussell I saw this prescribed in other posts. Is there any other way to reliably wait for a consumer to be ready before producing?Aikens
Hmmm - that's weird. I just noticed that you have embeddedKafkaBroker.consumeFromAnEmbeddedTopic() - that will wait until the partitions are assigned (or throw an exception if it doesn't happen), avoiding the race condition, so the offset reset setting shouldn't matter. I am not sure what's going on now. But certainly, waiting for the container assignment is wrong.Propertied
Dude if you digged into this topic since a year after this question, could you please help me to find the root cause of this strange behaviour. Local tests are OK but when I ran them through maven in TeamCity, it fails just like yoursAcuminate
B
0

I had a similar issue, although mine was expecting 24 partitions instead of 0. In the end, it was because my Bean for my NewTopic had a hardcoded number of partiions like so

@Bean
  public NewTopic domainEventsTopic(
      Topics topics,
      @Value("${spring.kafka.admin.replication}") short replicationFactor,) {
    return new NewTopic(topics.domainEvents(), 24, replicationFactor);
  }

which did not match my tests

ContainerTestUtils.waitForAssignment(container, partitions); // where partions is 1

The fix for my case was to add the spring.kafka.admin.partitions to my application.yaml and then create an application-test.yaml where that spring.kafka.admin.partitions was 1, instead of 24. and update the bean like so

  @Bean
  public NewTopic domainEventsTopic(
      Topics topics,
      @Value("${spring.kafka.admin.replication}") short replicationFactor,
      @Value("${spring.kafka.admin.partitions}") Integer partitions) {
    return new NewTopic(topics.domainEvents(), partitions, replicationFactor);
  }
Bloodworth answered 14/9, 2023 at 8:7 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.