We have an integration test where we use EmbeddedKafka and produce a message to a topic, our app processes that message, and the result is sent to a second topic where we consume and assert the output. In CI this works maybe 2/3 of the time, but we will hit cases where KafkaTestUtils.getSingleRecord
throws java.lang.IllegalStateException: No records found for topic
(See [1] below).
To try and resolve this, I added ContainerTestUtils.waitForAssignment
for each listener container in the registry (See [2] below). After a few successful runs in CI, I saw a new exception: java.lang.IllegalStateException: Expected 1 but got 0 partitions
. This now has me wondering if this was actually the root cause of the original exception of no records found.
Any ideas what could help with the random failures here? I would appreciate any suggestions on how to troubleshoot.
spring-kafka and spring-kafka-test v2.6.4.
Edit: Added newConsumer
for reference.
Example of our setup:
@SpringBootTest
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(
topics = { "topic1","topic2" },
partitions = 1,
brokerProperties = {"listeners=PLAINTEXT://localhost:9099", "port=9099"})
public class IntegrationTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Test
public void testExample() {
try (Consumer<String, String> consumer = newConsumer()) {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
[2]
ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafkaBroker.getPartitionsPerTopic());
}
try (Producer<String, String> producer = newProducer()) {
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "topic2"); // [1]
producer.send(new ProducerRecord<>(
"topic1",
"test payload"));
producer.flush();
}
String result = KafkaTestUtils.getSingleRecord(consumer, "topic2").value();
assertEquals(result, "expected result");
}
}
private Consumer<String, String> newConsumer() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("groupId", "false", embeddedKafkaBroker);
ConsumerFactory<String, AssetTransferResponse> consumerFactory = new DefaultKafkaConsumerFactory<>(
consumerProps,
new StringDeserializer(),
new CustomDeserializer<>());
return consumerFactory.createConsumer();
}
}
newConsumer()
- for the first problem, have you setConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
? Why are you waiting for a container to get an assignment when you plan to receive the record from yourConsumer
, not the container's consumer. – PropertiednewConsumer
new consumer for reference. We didn't set "earliest" here as we're trying to mimic our apps behavior, but we might end up just doing that. As far as waiting for the assignment, honestly was just looking at other threads as means of trying to programmatically stall for things to be "ready". – AikensembeddedKafkaBroker.consumeFromAnEmbeddedTopic()
- that will wait until the partitions are assigned (or throw an exception if it doesn't happen), avoiding the race condition, so the offset reset setting shouldn't matter. I am not sure what's going on now. But certainly, waiting for the container assignment is wrong. – Propertied