TL;DR
Reactor executes non-blocking/async tasks on a small number of threads. In case task is blocking - thread would be blocked and all other tasks would be waiting for it.
parallel
should be used for fast non-blocking operation (default option)
boundedElastic
should be used to "offload" blocking tasks
In general Reactor API is concurrency-agnostic that use Schedulers
abstraction to execute tasks. Schedulers
have responsibilities very similar to ExecutorService
.
Schedulers.parallel()
Should be a default option and used for fast non-blocking operation on a small number of threads. By default, number of threads is equal to number of CPU cores. It could be controlled by reactor.schedulers.defaultPoolSize
system property.
Schedulers.boundedElastic()
Used to execute longer operations (blocking tasks) as a part of the reactive flow. It will use thread pool with a default number of threads number of CPU cores x 10 (could be controlled by reactor.schedulers.defaultBoundedElasticSize
) and default queue size of 100000 per thread (reactor.schedulers.defaultBoundedElasticSize
).
subscribeOn
or publishOn
could be used to change the scheduler.
The following code shows how to wrap blocking operation
Mono.fromCallable(() -> {
// blocking operation
}).subscribeOn(Schedulers.boundedElastic()); // run on a separate scheduler because code is blocking
Schedulers.newBoundedElastic()
Similar to Schedulers.boundedElastic()
but is useful when you need to create a separate thread pool for some operation.
Sometimes it's not obvious what code is blocking. One very useful tool while testing reactive code is BlockHound