Spring Reactor onErrorContinue not working
Asked Answered
M

1

9

As per documentation I am expecting onErrorContinue will ignore the error element and continue the sequence. Below test case is failing with exception

java.lang.AssertionError: expectation "expectNext(12)" failed (expected: onNext(12); actual: onError(java.lang.RuntimeException:

@Test
public void testOnErrorContinue() throws InterruptedException {
    Flux<Integer> fluxFromJust = Flux.just(1, 2,3,4,5)
            .concatWith(Flux.error(new RuntimeException("Test")))
            .concatWith(Flux.just(6))
            .map(i->i*2)
            .onErrorContinue((e,i)->{
                System.out.println("Error For Item +" + i );
            })
            ;
    StepVerifier
            .create(fluxFromJust)
            .expectNext(2, 4,6,8,10)
            .expectNext(12)
            .verifyComplete();
}
Maneuver answered 29/12, 2019 at 17:30 Comment(0)
M
22

onErrorContinue() may not be doing what you think it does - it lets upstream operators recover from errors that may occur within them, if they happen to support doing so. It's a rather specialist operator.

In this case map() does actually support onErrorContinue, but map isn't actually producing an error - the error has been inserted into the stream already (by way of concat() and the explicit Flux.error() call.) In other words, there's no operator producing the error at all, so there's therefore nothing for it to recover from, as an element supplied is erroneous.

If you changed your stream so that map() actually caused the error, then it would work as expected:

Flux.just(1, 2,3,4,5)
        .map(x -> {
            if(x==5) {
                throw new RuntimeException();
            }
            return x*2;
        })
        .onErrorContinue((e,i)->{
            System.out.println("Error For Item +" + i );
        })
        .subscribe(System.out::println);

Produces:

2
4
6
8
Error For Item +5

An alternative depending on the real-world use case may be to use onErrorResume() after the element (or element source) that may be erroneous:

Flux.just(1, 2, 3, 4, 5)
        .concatWith(Flux.error(new RuntimeException()))
        .onErrorResume(e -> {
            System.out.println("Error " + e + ", ignoring");
            return Mono.empty();
        })
        .concatWith(Flux.just(6))
        .map(i -> i * 2)
        .subscribe(System.out::println);

In general, using another "onError" operator (such as onErrorResume()) is generally the more usual, and more recommended approach, since onErrorContinue() is dependent on operator support and affects upstream, not downstream operators (which is unusual.)

Mistreat answered 29/12, 2019 at 19:2 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.