Unable to unit test a @KafkaListener annotated method
Asked Answered
D

2

2

I'm trying to unit test a kafka consumer class, in Spring. I want to know that if a kafka message is sent to it's topic, the listener method was called correctly. My consumer class is annotated like this:

@KafkaListener(topics = "${kafka.topics.myTopic}")
public void myKafkaMessageEvent(final String message) { ...

If I @Autowire a consumer, when I send a kafka message, the listener method is called correctly, but I can't assert that the method was called because the class isn't a mock.

If I mock a consumer, when I send a kafka message the listener method is not called at all. I can call the method directly, and assert that it worked, but that doesn't do what I want, which is to check if the method is called when I send a kafka message to it's topic.

For now I have resorted to put a counter inside the consumer, and increment it every time the listener method is called, then check that it's value has been changed. Making a variable just for testing seems like a terrible solution to me.

Is there maybe a way to make the mocked consumer receive the kafka messages too? Or some other way to assert that the non-mocked consumer listener method was called?

Dismiss answered 7/5, 2018 at 12:29 Comment(0)
A
1

Sounds like you are requesting something similar what we have in Spring AMQP Testing Framework: https://docs.spring.io/spring-amqp/docs/2.0.3.RELEASE/reference/html/_reference.html#test-harness

So, if you are not good with extra variable you can borrow that solution and implement your own "harness".

I think that should be a good addition to the Framework so, please, raise an appropriate issue and we can together bring such a tool for the public.

UPDATE

So, according Spring AMQP foundation I did this in my test configuration:

public static class KafkaListenerTestHarness extends KafkaListenerAnnotationBeanPostProcessor {

    private final Map<String, Object> listeners = new HashMap<>();

    @Override
    protected void processListener(MethodKafkaListenerEndpoint endpoint, KafkaListener kafkaListener,
            Object bean, Object adminTarget, String beanName) {

        bean = Mockito.spy(bean);

        this.listeners.put(kafkaListener.id(), bean);

        super.processListener(endpoint, kafkaListener, bean, adminTarget, beanName);
    }

    @SuppressWarnings("unchecked")
    public <T> T getSpy(String id) {
        return (T) this.listeners.get(id);
    }

}

...

@SuppressWarnings("rawtypes")
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public static KafkaListenerTestHarness kafkaListenerAnnotationBeanPostProcessor() {
    return new KafkaListenerTestHarness();
}

Then in the target test-case I use it like this:

@Autowired
private KafkaListenerTestHarness harness;
...
Listener listener = this.harness.getSpy("foo");

verify(listener, times(2)).listen1("foo");
Andvari answered 7/5, 2018 at 16:59 Comment(10)
There KafkaListenerAnnotationBeanPostProcessor to process all those @KafkaListener methods. You just need to follow a RabbitListenerTestHarness logic. Let me show something simple in my answer a bit later!Andvari
Please, find an UPDATE in my answer.Andvari
I'm sorry, I'm a bit confused... So I have this: @Autowired private TestConfig.KafkaListenerTestHarness harness; and this: @Autowired private ReceiveMessageRetrievedEventHandler receiver; The second is my consumer with the listener method. How would I use your code here? And the foo parameter I also don't get... I appreciate the help, sorry I'm having some difficult with this...Robber
Pay attention how I store the spy - into the KafkaListenerTestHarness .listeners Map. And only this way you can get access to the spy. The foo is an id() on the @KafkaListener. See what I get for the map key in the KafkaListenerTestHarness.Andvari
I think I understand... But I'm getting a Another endpoint is already registered with id error now that I can't get rid of...Robber
I am, I only use this id in one place: @KafkaListener( topic ... id = "ReceiveMessageProductRetrievedEventHandlerID") public void productRetrievedEvent(final ProductTransaction message) { And then in my test: @ContextConfiguration(classes = {TestConfig.class ... ReceiveMessageProductRetrievedEventHandler.class}) @Autowired private TestConfig.KafkaListenerTestHarness harness; The error happens if this line on the harness is run: super.processListener(endpoint, kafkaListener, bean, adminTarget, beanName); If I comment it it goes away.Robber
It does not. Wanted but not invoked. on here: verify(listener, times(1)).productRetrievedEvent(anyObject());. And, I had a breakpoint on the listener, it was indeed called. I'm gonna read all this code again, see if I'm missing something...Robber
I realize I'm not using your kafkaListenerAnnotationBeanPostProcessor() anywhere, it's just there... Is that right?Robber
I don't understand your last question. The kafkaListenerAnnotationBeanPostProcessor() is a bean which just has to be declared there.Andvari
If that doesn't work for you, then you should abandon such a solution and go some other way. Also I've already asked you to raise an appropriate GH issue on the matter and we definitely will do something in the Framework for these use-cases.Andvari
R
0

Please note that the bean that will be exercised in the spring context refresh can not be mocked. If you want to mock this bean like kafkaListener, just use the @SpyBean annotation. You can find more in this github issue of the spring boot. https://github.com/spring-projects/spring-boot/issues/16324

Risa answered 27/2, 2023 at 6:3 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.