Spring Kafka Test - Not receiving data in @KafkaListener with EmbeddedKafka
Asked Answered
W

3

6

We are doing some integration test for out application using Cucumber and we are having some issues testing a @KafkaListener. We managed to use an EmbeddedKafka and produce data into it.

But the consumer never receives any data and we don't know what is going on.

This is our code:

Producer Config

@Configuration
@Profile("test")
public class KafkaTestProducerConfig {

    private static final String SCHEMA_REGISTRY_URL = "schema.registry.url";

    @Autowired
    protected EmbeddedKafkaBroker embeddedKafka;

    @Bean
    public Map<String, Object> producerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                embeddedKafka.getBrokersAsString());
        props.put(SCHEMA_REGISTRY_URL, "URL");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, GenericRecord> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, GenericRecord> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

Consumer Config

@Configuration
@Profile("test")
@EnableKafka
public class KafkaTestConsumerConfig {

    @Autowired
    protected EmbeddedKafkaBroker embeddedKafka;

    private static final String SCHEMA_REGISTRY_URL = "schema.registry.url";

    @Bean
    public Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        props.put(SCHEMA_REGISTRY_URL, "URL");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000);
        return props;
    }

    @Bean
    public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {
        KafkaAvroDeserializer avroDeserializer = new KafkaAvroDeserializer();
        avroDeserializer.configure(consumerProperties(), false);
        return new DefaultKafkaConsumerFactory<>(consumerProperties(), new StringDeserializer(), avroDeserializer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

}

Integration Test

@SpringBootTest(
        webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
        classes = Application.class)
@ActiveProfiles("test")
@EmbeddedKafka(topics = {"TOPIC1", "TOPIC2", "TOPIC3"})
public class CommonStepDefinitions implements En {

    protected static final Logger LOGGER = LoggerFactory.getLogger(CommonStepDefinitions.class);

    @Autowired
    protected KafkaTemplate<String, GenericRecord> kafkaTemplate;

}

Step Definitions

public class KafkaStepDefinitions extends CommonStepDefinitions {

    private static final String TEMPLATE_TOPIC = "TOPIC1";

    public KafkaStepDefinitions(){
        Given("given statement", () -> {
            OperationEntity operationEntity = new OperationEntity();
            operationEntity.setFoo("foo");
            kafkaTemplate.send(TEMPLATE_TOPIC, AvroPojoTransformer.pojoToRecord(operationEntity));
        });
    }

}

The Consumer This same code is working fine for the production Bootstrap server, but its never reached with the Embedded Kafka

@KafkaListener(topics = "${kafka.topic1}", groupId = "groupId")
    public void consume(List<GenericRecord> records, Acknowledgment ack) throws DDCException {
        LOGGER.info("Batch of {} records received", records.size());
        //do something with the data
        ack.acknowledge();
    }

Everything in the logs looks fine, but we don't know what is missing.

Thanks in advance.

Webbed answered 11/3, 2020 at 15:39 Comment(4)
Everything looks good to me; are you sure the record is published? Can you post the logs someplace (pastebin etc)?Scissor
The only thing that isn't looking good in the logs is this error:Error deleting ...\AppData\Local\Temp\kafka-7678436650062051819\TOPIC1-0\00000000000000000000.index: The process cannot access the file because it is being used by another processWebbed
Yeah - that's a (yet unsolved) issue on Windows during shut down. If you are getting the partitions assigned INFO log, I can pretty much guarantee the problem is on the publishing side. Again, if you can post the logs, I can take a look.Scissor
This is the complete log of the integration test: pastebin.com/FjYmJ2TWWebbed
M
0

The problem is that the consumer is not wired up to the embedded Kafka. You can do this by running your tests with a test profile and adding the following to application-test.yml.

spring:
  kafka:
    bootstrap-servers: ${spring.embedded.kafka.brokers}

Then you also don't need the custom consumerProperties, consumerFactory and kafkaListenerContainerFactory beans. Spring boot will autowire these for you. If you do wish to use those beans (wouldn't know why) you should double check the KafkaAutoConfiguration to make sure you're overriding the right names and types.

Murine answered 11/3, 2020 at 22:18 Comment(4)
I tried to remove my beans and use the application-test to configure it, still nothingWebbed
I am not familiar with Cucumber; you don't have an @Test method in your @SpringBootTest class. In any case, your test is ending before it starts; see the thread name containing 0-0-C-1; the consumer is stopped less than a second after it started.Scissor
I just checked and no, my test is executing because you can see the log of the ProducerConfig values in the line 1174 of the log. And that log is appearing just after the kafkaTemplate.send(topic, entity). I dont use @Test because in Cucumber you have stepDefinitions. You can see the code in my post.Webbed
Might be good to try to write this as a JUnit test first. Remove some of the complexity and get a larger audience to understand the problem.Murine
S
0

Your test is ending before it starts; see the thread name containing 0-0-C-1; the consumer is stopped less than a second after it started.

I just checked and no, my test is executing because you can see the log of the ProducerConfig values in the line 1174 of the log. And that log is appearing just after the kafkaTemplate.send(topic, entity). I dont use @Test because in Cucumber you have stepDefinitions. You can see the code in my post.

OK; but you need some kind of latch in the test to wait for the consumer to actually get topics/partitions assigned and receive the data. The way you have the test structured now, the test is shut down before the consumer has fully started. See my answer to this question for one way to wrap the listener so you can wait until the record is received. (This uses normal JUnit tests).

Another technique would be to somehow inject a service into your listener bean that counts down a latch.

As a quick test add Thread.sleep(10_000) to your "step".

But, presumably, you will want to assert somehow that the consumer actually got the data. You need to do that assertion before the test exits and because it's async, you need some mechanism to wait for it to happen (or time out).

Scissor answered 12/3, 2020 at 21:44 Comment(0)
S
0

I am also facing same issue with EmbeddedKafka Try using KafkaContainer

@ActiveProfiles({"test"})
@RunWith(Cucumber.class)
@CucumberOptions(features= {"src/test/resources/cucumber/data.feature"},
        plugin = {"pretty", "json:target/cucumber.json"})
@SpringBootTest(classes = MyApplication.class)
public final class MyApplicationCucumberTest {
    private MyApplicationCucumberTest() {}

    @Container
    private static KafkaContainer kafkaContainer = new KafkaTestContainer();


    @BeforeClass
    public static void beforeClass() throws IOException, TTransportException {

        kafkaContainer.start();
        System.out.println("Kafka Bootstrap server : " + kafkaContainer.getBootstrapServers());
        System.setProperty("spring.kafka.bootstrap", kafkaContainer.getBootstrapServers());
        System.out.println("Kafka Bootstrap server : " + System.getProperty("spring.kafka.bootstrap"));
        try {
            // Create Topic 
            kafkaContainer.execInContainer("/bin/sh", "-c", "/usr/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my.topic");
            
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println(e);
        }

    }


}

Also add spring.kafka.consumer.auto-offset-reset: earliest in application.yml

public class KafkaTestContainer extends KafkaContainer {
    private static final String KAFKA_DOCKER = "confluentinc/cp-kafka:5.4.3";

    public KafkaTestContainer() {
        super(DockerImageName.parse(KAFKA_DOCKER));
        

    }
}
Snapp answered 30/7, 2021 at 15:26 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.