How to test pollling pattern in Mutiny on Quarkus?
Asked Answered
H

2

6

I would like to test a simple polling example from https://smallrye.io/smallrye-mutiny/guides/polling and poll the data of a service into a Kafka stream.

This is a simplified example of a class I want to test:

@ApplicationScoped
public class ExampleScheduler {

    @Inject
    @RestClient
    ExapleService service;


    @PostConstruct
    void init() {
        pollSource();
    }

    @Outgoing("sensor_data_out")
    Multi<String> pollSource() {
        Uni<String> stream = service.getString()
                .runSubscriptionOn(Infrastructure.getDefaultExecutor());

        return stream.repeat().withDelay(Duration.ofSeconds(3))
                .indefinitely();
    }
}

And here is the test class:

@QuarkusTest
class ExampleSchedulerTest {
    
    @Inject
    ExampleScheduler classToTest;

    @InjectMock
    ExampleService mockService;

    @BeforeEach
    void setUp() {
        when(mockService.getString()).thenReturn(Uni.createFrom().item("ANSWER"));
    }

    @Test
    void pollSource() {
        final Multi<String> stream = classToTest.pollSource();
        AssertSubscriber<String> subscriber = stream.subscribe().withSubscriber(AssertSubscriber.create(1));
        subscriber.assertCompleted()
                .assertItems("ANSWER");
    }
}

The error log on my actual example is:

I am trying to rely on Quarkus test containers to provide an instance of Kafka

java.lang.AssertionError: No completion (or failure) event received in the last 10000 ms

    at io.smallrye.mutiny.helpers.test.AssertSubscriber.awaitCompletion(AssertSubscriber.java:424)
    at io.smallrye.mutiny.helpers.test.AssertSubscriber.awaitCompletion(AssertSubscriber.java:408)
    at si.src.xanathar.cron.RawSensorDataSchedulerTest.pollThinsBoard(RawSensorDataSchedulerTest.java:86)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at io.quarkus.test.junit.QuarkusTestExtension.runExtensionMethod(QuarkusTestExtension.java:922)
    at io.quarkus.test.junit.QuarkusTestExtension.interceptTestMethod(QuarkusTestExtension.java:752)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
    at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
    at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)

Hickman answered 4/11, 2021 at 8:6 Comment(0)
W
3

In order to test an infinite Multi stream, such as the one resulting by the call to UniRepeat#indefinitely, you can collect a predefined set of values then cancel upstream subscription.

Such a behavior can be achieved using the select, first, collect and last operators:

@QuarkusTest
class ExampleSchedulerTest {
    
    @Inject
    ExampleScheduler classToTest;

    @InjectMock
    ExampleService mockService;

    @BeforeEach
    void setUp() {
        when(mockService.getString()).thenReturn(Uni.createFrom().item("ANSWER"));
    }

    @Test
    void pollSource() {
        String lastValue = classToTest.pollSource()
                .select().first(1)
                .collect().last()
                .await().atMost(Duration.ofSeconds(4)); // wait for 4 seconds to allow time for the configured upstream delay
        assertEquals("ANSWER", lastValue);
    }
}
Wallet answered 6/11, 2021 at 13:16 Comment(3)
This is also correct. Although I am wondering if terminating stream like you showed has advantage over omitting assertCompleted() from the assertion like @jzimmerli showed.Hickman
Absolutely. This has the advantage of being deterministic and predictable where you control the emissions window on which you can perform strict assertions. This should be your way to go instead of asserting that a value is emitted without knowing how many items have been emitted in a the pre-defined time window or of the stream goes to completion or if it emits any random errors. As a hint, change the withDelay duration to 5 and check if the test still succeeds.Wallet
Thanks for this tip! Your code with some modifications: var asserter = classToTest.pollSource().collect().first().subscribe().withSubscriber(UniAssertSubscriber.create()); asserter.assertItem("ANSWER");Trachytic
B
2

If I see it correctly, you assert that the stream is completed:

subscriber.assertCompleted()
            .assertItems("ANSWER");

But your stream is endless and never completes. Therefore, a timeout event is called. You simply can remove the assertCompleted() term. And your test should run.

Bellona answered 6/11, 2021 at 9:23 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.