How to handle error while executing Flux.map()
Asked Answered
L

5

34

I´m trying to figure out how handle errors when mapping elements inside a Flux.

For instance, I´m parsing a CSV string into one of my business POJOs:

myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock));

Some of this lines might contain errors, so what I get in the log is:

 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001))
 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999))
 reactor.core.publisher.FluxLog:  onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo)
 reactor.core.publisher.FluxLog:  java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo

I read in the API some error handling methods, but most refered to returning an "error value" or using a fallback Flux, like this one:

Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff);

However, using this with my myflux means that the whole flux is processed again.

So, is there a way to handle errors while processing particular elements (I.e ignoring them/Logging them) and keep processing the rest of the flux?

UPDATE with @akarnokd workaround

public Flux<StockQuotation> getQuotes(List<String> tickers)
{
    Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
    // Get each set of quotes in a separate thread
    .flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
    // Convert each list of raw quotes string in a new Flux<String>
    .flatMap(list -> Flux.fromIterable(list))
    // Convert the string to POJOs
    .flatMap(x -> {
            try {
                return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));    
            }
            catch (IllegalArgumentException ex){
                System.out.println("Error decoding stock quotation: " + x);
                return Flux.empty();
            }
    });

    return processingFlux;
}

This works as a charm, however, as you can see the code is less elegant than before. Does not the Flux API have any method to do what this code does?

retry(...)
retryWhen(...)
onErrorResumeWith(...)
onErrorReturn(...)
Lexi answered 26/3, 2016 at 15:39 Comment(1)
You may be able to use a custom exception which can include the failed element as a variable in it. Then in the onError method, you can get the failed element through the getter method in the custom exception.Thromboplastin
K
32

You need flatMap instead which let's you return an empty sequence if the processing failed:

myflux.flatMap(v -> {
    try {
        return Flux.just(converter.convertHistoricalCSVToStockQuotation(stock));
    } catch (IllegalArgumentException ex) {
        return Flux.empty();
    }
});
Kokaras answered 30/3, 2016 at 10:23 Comment(3)
Works great (gonna accept this answer) but I would like to know whether this can be done with the API. If not, I will open a feature request. Thanks!Lexi
This is the de-facto standard API to perform such behavior. Errors are terminal events and you have to transform them into something else in lambdas to avoid termination.Kokaras
Ok. I proposed the creation of a new method to handle individual failures (perhaps publishing those failures as a "dead letter" flux?). Maybe this could be helpful...Lexi
C
21

If you want to use Reactor 3's methods for dealing with exceptions, you can use Mono.fromCallable.

flatMap(x -> 
    Mono.fromCallable(() -> converter.convertHistoricalCSVToStockQuotation(x))
        .flux()
        .flatMap(Flux::fromIterable)
        .onErrorResume(Flux::empty)
)

Unfortunately there is no Flux.fromCallable, so assuming the callable returns a list, you have to convert it to a Flux manually.

Crutcher answered 26/5, 2018 at 13:13 Comment(0)
O
9

With the current version of Reactor 3, quite a few methods have been added. So we could do something like so:

Flux.onErrorResume(error -> { 
        System.out.println("Error decoding stock quotation: " + e);
        return Flux.empty();
    });

See more info on how to handle errors here

Oxalate answered 22/5, 2018 at 15:23 Comment(0)
I
4

You can use onErrorContinue. It allows recovering from errors by dropping the trouble element and continuing with the subsequent elements.

Innerve answered 8/1, 2019 at 23:51 Comment(0)
R
2
...
       // Convert the string to POJOs
    .flatMap(x ->
        Flux.just(converter.convertHistoricalCSVToStockQuotation(x))
            .doOnError(IllegalArgumentException.class,
                e -> System.out.println("Error decoding stock quotation: " + x))
            //.onErrorStop()
            .onErrorResume(IllegalArgumentException.class, e -> Flux.empty())
    )
...
Reserpine answered 21/4, 2020 at 15:23 Comment(1)
Using just means that the calculation will not be done "inside" the Mono/Flux, so the error will not be handled by it.Crutcher

© 2022 - 2024 — McMap. All rights reserved.