How to Count Items in a Flux, return error if count is greater than X, else continue with Pipeline
Asked Answered
W

2

9

I'm new to Project Reactor in Spring, and I'm not fully sure how to perform something:

I have my pipeline the pipeline returns records. All good.

But I would like to count those records and then do something (like if else), where if records returned are > X then error, otherwise just continue.

Knowing that Count returns a Mono<Long>, then I'll lose the records after that, what could I do?

I'm thinking:

Somehow use flatMap and perform something inside this flatmap. Somehow I see there is a reduce method in Flux that might help.

The point is, I'm not sure how to proceed.

Willwilla answered 21/5, 2020 at 16:5 Comment(2)
There are a few ways to limit the total number of results returned by a Flux. If you do not care about throwing an exception you can use the take method to set a maximum number of elements emitted by the Flux, regardless of how many upstream elements are producedAvailability
maybe subscribe works baeldung.com/java-string-from-monoImpost
L
14

Not entirely sure what you want so going to provide two suggestions based on assumptions

1.. You want to collect all elements then assess if there are more than n, throw error if so. You can use collectList, count the elements and then convert back to flux if under whatever. This will only doStuff on any element if the total is under the limit.

    Flux.range(1,10)
            .collectList()
            .flatMap(s -> 
                s.size()>7 
                    ? Mono.error(new RuntimeException("TOO MANY!")) 
                    : Mono.just(s))
            .flatMapMany(Flux::fromIterable)
            .map(this::doStuff)

2.. You want to assess number of elements on the fly, which you can use an external Atomic counter for. This will doStuff on every element up to the problematic one.

    AtomicLong count = new AtomicLong();

    Flux.range(1,10)
            .flatMap(s -> 
                count.incrementAndGet() > 7 
                    ? Flux.error(new RuntimeException("TOO MANY!")) 
                    : Flux.just(s))
             .map(this::doStuff);
Lampion answered 21/5, 2020 at 16:56 Comment(1)
First approach worked perfectly for the case of use of checking after getting the elements. Thanks.Willwilla
F
-7

I am also new to Spring Reactor and reactive programming, but I tried on my end and this worked for me returning the Long value of flux elements number:

fluxObject.count().block().longValue()

you can also use shortValue() when this is the case.

Foliate answered 21/1, 2022 at 2:4 Comment(5)
You can't block in reactive programming. This is fundamentally wrongEustacia
Blocking on count works perfectly fine here and is an upvoted solution: #47924607 Thank you very much again!Foliate
Blocking on count is a block on Mono<Long> here (returned by the count() function), is not a block on FluxMapFuseable. Can you say otherwise, are you very sure? Thanks again!Foliate
@Foliate literally the entire point of reactive/asynchronous programming is to AVOID blocking. if you call block() for something other than testing purposes, you are most definitely doing something wrong.Crawley
@KrustytheClown, but what about schedulers? There is no initiate request, so I think that block() is a good way to run processing. Or you can suggest another way to do it?Saracen

© 2022 - 2024 — McMap. All rights reserved.