How to control parallelism of Flux.flatMap (Mono)?
Asked Answered
E

2

7

The code below executes all web requests (webClient) in parallel, not respecting the limit I put in parallel(5).

        Flux.fromIterable(dataListWithHundredsElements)
            .parallel(5).runOn(Schedulers.boundedElastic())
            .flatMap(element -> 
                webClient.post().
                .bodyValue(element)
                .retrieve()
                .bodyToMono(String.class)
                .doOnError(err -> element.setError(Utils.toString(err)))
                .doOnSuccess(r -> element.setResponse(r))
            )
            .sequential()
            .onErrorContinue((e, v) -> {})
            .doOnComplete(() -> updateInDatabase(dataListWithHundresdElements))
            .subscribe();

I would like to know if it is possible to execute requests according to the value specified in parallel(5) and how best to do that?

One detail, this code is a Spring MVC application which I am making requests for an external service.

UPDATE 01

In fact Flux creates the 5 threads, however, all requests (WebClient Mono) are executed at the same time.

What I want is to have 5 requests executed at a time, so when 1 request ends another request is started, but at no time should there be more than 5 requests in parallel.

As Mono is also a reactive type, it seems to me that the 5 threads of Flux invoke it and are not blocked, in practice what happens is that all requests happen in parallel.

UPDATE 02 - External Service Logs

This is the log of the external service which takes about 5 seconds to respond. As you can see in the logs below, 14 requests at the same time.

2020-05-08 11:53:56.655  INFO 28223 --- [nio-8080-exec-8] EXTERNAL SERVICE LOG {"id": 21} http-nio-8080-exec-8
2020-05-08 11:53:56.655  INFO 28223 --- [nio-8080-exec-7] EXTERNAL SERVICE LOG {"id": 20} http-nio-8080-exec-7
2020-05-08 11:53:56.659  INFO 28223 --- [nio-8080-exec-2] EXTERNAL SERVICE LOG {"id": 27} http-nio-8080-exec-2
2020-05-08 11:53:56.659  INFO 28223 --- [nio-8080-exec-6] EXTERNAL SERVICE LOG {"id": 19} http-nio-8080-exec-6
2020-05-08 11:53:56.659  INFO 28223 --- [io-8080-exec-10] EXTERNAL SERVICE LOG {"id": 23} http-nio-8080-exec-10
2020-05-08 11:53:56.660  INFO 28223 --- [nio-8080-exec-5] EXTERNAL SERVICE LOG {"id": 18} http-nio-8080-exec-5
2020-05-08 11:53:56.660  INFO 28223 --- [nio-8080-exec-9] EXTERNAL SERVICE LOG {"id": 17} http-nio-8080-exec-9
2020-05-08 11:53:56.660  INFO 28223 --- [nio-8080-exec-1] EXTERNAL SERVICE LOG {"id": 29} http-nio-8080-exec-1
2020-05-08 11:53:56.661  INFO 28223 --- [nio-8080-exec-4] EXTERNAL SERVICE LOG {"id": 24} http-nio-8080-exec-4
2020-05-08 11:53:56.666  INFO 28223 --- [io-8080-exec-11] EXTERNAL SERVICE LOG {"id": 25} http-nio-8080-exec-11
2020-05-08 11:53:56.675  INFO 28223 --- [io-8080-exec-13] EXTERNAL SERVICE LOG {"id": 42} http-nio-8080-exec-13
2020-05-08 11:53:56.678  INFO 28223 --- [io-8080-exec-14] EXTERNAL SERVICE LOG {"id": 28} http-nio-8080-exec-14
2020-05-08 11:53:56.680  INFO 28223 --- [io-8080-exec-12] EXTERNAL SERVICE LOG {"id": 26} http-nio-8080-exec-12
2020-05-08 11:53:56.686  INFO 28223 --- [io-8080-exec-15] EXTERNAL SERVICE LOG {"id": 22} http-nio-8080-exec-15

UPDATE 03 - Reactor Logs

Reinforcing, the external service takes about 5 seconds to respond. However it is possible to see that all requests (14) are made at almost the same time.

2020-05-08 11:53:56.051  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.053  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : request(unbounded)
2020-05-08 11:53:56.081  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.081  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : request(unbounded)
2020-05-08 11:53:56.082  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.082  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : request(unbounded)
2020-05-08 11:53:56.093  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.093  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : request(unbounded)
2020-05-08 11:53:56.094  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.095  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : request(unbounded)
2020-05-08 11:53:56.110  INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1                 : onNext(@40ddcd53)
2020-05-08 11:53:56.112  INFO 28223 --- [oundedElastic-5] reactor.Parallel.RunOn.1                 : onNext(@200e0819)
2020-05-08 11:53:56.112  INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1                 : onNext(@3b81eee2)
2020-05-08 11:53:56.113  INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1                 : onNext(@60af2a4d)
2020-05-08 11:53:56.115  INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1                 : onNext(@723db553)
2020-05-08 11:53:56.440  INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1                 : onNext(@387743b5)
2020-05-08 11:53:56.440  INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1                 : onNext(@62ed2f8d)
2020-05-08 11:53:56.440  INFO 28223 --- [oundedElastic-5] reactor.Parallel.RunOn.1                 : onNext(@1a40554a)
2020-05-08 11:53:56.442  INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1                 : onNext(@1bcb696a)
2020-05-08 11:53:56.440  INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1                 : onNext(@46c98823)
2020-05-08 11:53:56.443  INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1                 : onComplete()
2020-05-08 11:53:56.446  INFO 28223 --- [oundedElastic-5] reactor.Parallel.RunOn.1                 : onComplete()
2020-05-08 11:53:56.442  INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1                 : onNext(@1c0da4a)
2020-05-08 11:53:56.448  INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1                 : onComplete()
2020-05-08 11:53:56.452  INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1                 : onNext(@14d54d26)
2020-05-08 11:53:56.453  INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1                 : onComplete()
2020-05-08 11:53:56.490  INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1                 : onNext(@46e43af)
2020-05-08 11:53:56.492  INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1                 : onNext(@5ca02355)
2020-05-08 11:53:56.496  INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1                 : onComplete()
Endothelium answered 8/5, 2020 at 10:9 Comment(7)
when you say not respecting parallel divides up the work in 'rails' spread on the number of cores in a round robin fashion. Show the logs that proves your statementManicure
Hey @Thomas Andolf, I updated the question to explain. The Flux creates the 5 threads, but, all WebClient requests occur at same time. So, my goal is to limit webClient requests to run no more than 5 requests at a time.Endothelium
Parallell = ”side by side”, which means that parallell(5) will allocate 5 threads and run all your requests on these 5 threads. So there is nothing wrong with parallell. Now that we know that you want to BATCH your requests in groups of five its a completely different problem.Manicure
Look into the limitRate functionManicure
@ThomasAndolf don't think they want to batch, just run max n parallel at any time.Loginov
It isn't really clear what the Marlon Patrick wants.Fleischman
Use the overloaded version of flatMap() with the concurrency factor.Banana
O
11

You could use ParallelFlux#flatMap(Function<? super T,? extends Publisher<? extends R>>, boolean, int) method to control concurrency.

For your situation it could be:

        .flatMap(element -> 
            webClient.post().
            .bodyValue(element)
            .retrieve()
            .bodyToMono(String.class)
            .doOnError(err -> element.setError(Utils.toString(err)))
            .doOnSuccess(r -> element.setResponse(r)),
            false, 1
        )

But, actually, you don't have to create ParallelFlux. Just use Flux#flatMap(Function<? super T,? extends Publisher<? extends V>>, int)method:

Flux.fromIterable(dataListWithHundredsElements)
        .flatMap(element -> webclient.post()..., 5)
...

The second argument of the flatMap method is responsible for concurrency.

Occam answered 8/5, 2020 at 16:13 Comment(9)
Big +1 for this solution. I see this is a very common misconception to use ParallelFlux for IO operations to achieve concurrency. ParallelFlux is intended for CPU intensive tasks it has no practical use in case of IO operations.Peppery
Hey guys, what about Schedulers in this approach without ParallelFlux? Should I use publishOn or subscribeOn and designate a especific scheduller like boundedElastic?Endothelium
@MarlonPatrick WebClient uses non-blocking Netty threads internally. So you don't have to switch execution to other threads for IO operations.Occam
@Loginov Parallel scheduler is not the default. By default everything runs on the same thread where the stream was assembled (e. g. main thread, request thread). However, in this specific case WebClient delegates tasks to its own event loop internally (which is not the parallel scheduler). projectreactor.io/docs/core/release/reference/#schedulersPeppery
@AlexanderPankin "Default is already parallel scheduler so you don't need to do anything unless you have a specific use case" is wrong. This OP is lucky in this case as webclient will ultimately make a switch to its thread pools and flatmap's inner thread will become free. Otherwise, flatmap does not exhibit any concurrency out-of-the-box. You have to switch schedulers if you want concurrency.Banana
@PrashantPandey , totally agree with you. That's why my comment is about webclient and why I provide two examples for different situations.Occam
@MartinTarjányi Removed comment as incorrect as pointed out. I was using delay which by default uses parallel scheduler. My badLoginov
@Alexander Pankin, as far as the concurrency parameter of flatMap is considered, it simply determines how much simultaneous subscription are done by flatMap method right? Essentially batching the requests like asked in OP. So it shouldn't really affect any threading as it would be totally on schedulers.Zelazny
@Deekshith Anand, you're right, as far as i know.Occam
E
2

My 2 cents on flatMap, concurrency parameter and the control of parallelization:

FlatMap does not provide any concurrency out of the box as already stated in the comments of the accepted answer. Meaning, if you do this:

var events = Flux.range(1, 1000)
    .flatMap(i -> Mono.fromCallable(() -> i))
    .log();

events.subscribe();

You will see only one thread in the logs -- the one where you performed the subscription. If you subscribed in the MAIN thread you will see:

INFO 36311 --- [main] reactor.Flux.FlatMap.1 : onNext(871)
INFO 36311 --- [main] reactor.Flux.FlatMap.1 : onNext(872)
INFO 36311 --- [main] reactor.Flux.FlatMap.1 : onNext(873)
INFO 36311 --- [main] reactor.Flux.FlatMap.1 : onNext(874)
INFO 36311 --- [main] reactor.Flux.FlatMap.1 : onNext(875)

To achieve the concurrent execution in flatMap you need to use Schedulers:

var events = Flux.range(1, 1000)
    .flatMap(i -> Mono.fromCallable(() -> {
            Thread.sleep(3000);
            return i;
        })
        .publishOn(Schedulers.parallel()))
    .log();

events.subscribe();

You can see in the logs that many threads are utilised:

INFO 41068 --- [ parallel-2] reactor.Flux.FlatMap.1 : onNext(1)
INFO 41068 --- [ parallel-9] reactor.Flux.FlatMap.1 : onNext(8)
INFO 41068 --- [parallel-12] reactor.Flux.FlatMap.1 : onNext(11)
INFO 41068 --- [parallel-12] reactor.Flux.FlatMap.1 : onNext(2)
INFO 41068 --- [parallel-12] reactor.Flux.FlatMap.1 : onNext(3)
INFO 41068 --- [parallel-12] reactor.Flux.FlatMap.1 : onNext(4)
INFO 41068 --- [parallel-12] reactor.Flux.FlatMap.1 : onNext(5)
INFO 41068 --- [parallel-12] reactor.Flux.FlatMap.1 : onNext(6)
INFO 41068 --- [parallel-12] reactor.Flux.FlatMap.1 : onNext(7)
INFO 41068 --- [parallel-12] reactor.Flux.FlatMap.1 : onNext(9)
INFO 41068 --- [parallel-12] reactor.Flux.FlatMap.1 : onNext(10)
INFO 41068 --- [parallel-12] reactor.Flux.FlatMap.1 : onNext(12)
INFO 41068 --- [parallel-12] reactor.Flux.FlatMap.1 : onNext(13)
INFO 41068 --- [parallel-12] reactor.Flux.FlatMap.1 : onNext(14)
INFO 41068 --- [parallel-12] reactor.Flux.FlatMap.1 : onNext(15)
INFO 41068 --- [parallel-12] reactor.Flux.FlatMap.1 : onNext(16)
// 3 seconds break
// next batch of 16 events

Every 3 seconds the batch of 16 events was handled by up to 16 threads (I have 16 cores). In the doc, you can see that flatMap invokes internally the overloaded method:

    public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) {
        return flatMap(mapper, Queues.SMALL_BUFFER_SIZE, Queues
                .XS_BUFFER_SIZE);
    }

Now, you can tune this behaviour by specifying 2nd (concurrency) and 3rd (prefetch) argument of flatMap:

var events = Flux.range(1, 1000)
    .flatMap(i -> Mono.fromCallable(() -> {
        Thread.sleep(3000);
        return i;
    }).publishOn(Schedulers.parallel()), 3, 3)
    .log();

events.subscribe();

From the doc:

The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel

What in plain English means: how many threads do you want to use for the batch of events. In the result, you can observe sth like:

// 3 seconds break
INFO 38380 --- [     parallel-5] reactor.Flux.FlatMap.1 : onNext(4)
INFO 38380 --- [     parallel-6] reactor.Flux.FlatMap.1 : onNext(5)
INFO 38380 --- [     parallel-7] reactor.Flux.FlatMap.1 : onNext(6)
// 3 seconds break
INFO 38380 --- [     parallel-8] reactor.Flux.FlatMap.1 : onNext(7)
INFO 38380 --- [     parallel-8] reactor.Flux.FlatMap.1 : onNext(8)
INFO 38380 --- [     parallel-8] reactor.Flux.FlatMap.1 : onNext(9)
// 3 seconds break
INFO 38380 --- [    parallel-11] reactor.Flux.FlatMap.1 : onNext(10)
INFO 38380 --- [    parallel-11] reactor.Flux.FlatMap.1 : onNext(12)
INFO 38380 --- [    parallel-11] reactor.Flux.FlatMap.1 : onNext(11)
// 3 seconds break

At the given time a maximum of 3 threads is used. Now when you set the concurrency and prefetch to 1 you will get:

INFO 40043 --- [ parallel-2] reactor.Flux.FlatMap.1 : onNext(1)
// 3 seconds break
INFO 40043 --- [ parallel-3] reactor.Flux.FlatMap.1 : onNext(2)
// 3 seconds break
INFO 40043 --- [ parallel-4] reactor.Flux.FlatMap.1 : onNext(3)
// 3 seconds break
INFO 40043 --- [ parallel-5] reactor.Flux.FlatMap.1 : onNext(4)
// 3 seconds break
INFO 40043 --- [ parallel-6] reactor.Flux.FlatMap.1 : onNext(5)
// 3 seconds break
INFO 40043 --- [ parallel-7] reactor.Flux.FlatMap.1 : onNext(6)
// 3 seconds break
INFO 40043 --- [ parallel-8] reactor.Flux.FlatMap.1 : onNext(7)
// 3 seconds break
INFO 40043 --- [ parallel-9] reactor.Flux.FlatMap.1 : onNext(8)
// 3 seconds break
INFO 40043 --- [parallel-10] reactor.Flux.FlatMap.1 : onNext(9)

You use only one thread at a given time.

Enchase answered 19/10, 2023 at 20:6 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.