Is there a "default" scheduler in the Project Reactor? Which one is it? By "default" I mean the one used when no subscribeOn()
nor publishOn()
are called for the chain.
By default, data production process starts on the Thread
that initiated the subscription. Operators that deal with a time (e.g. Mono.delay
) will default to running on the Schedulers.parallel()
scheduler.
Most of reactive libraries (Reactive Redis, Mongo, ...) would use parallel
as a default scheduler.
For example Spring WebFlux, typically use Reactor Netty as a default embedded server and would initiate subscription on Schedulers.parallel()
.
parallel
–
Rafe Summary
There isn't anything like a "default scheduler" in project Reactor. It depends on many factors and unless you carefully specify the scheduler, you should not make your code logic dependent on any assumptions.
Warning
As many of us use the Project Reactor library together with Spring WebFlux framework, we tend to mentally mix these two. To answer the question properly, it's good to be aware of the borderline.
Detailed answer with sources
(My answer gives more or less the same as @Alex's, but I tried to have it based on some authoritative sources.)
The Reactor documentation says that
Reactor... can be considered to be concurrency-agnostic. That is, it does not enforce a concurrency model. Rather, it leaves you, the developer, in command. However, that does not prevent the library from helping you with concurrency.
Obtaining a
Flux
or aMono
does not necessarily mean that it runs in a dedicated Thread. Instead, most operators continue working in the Thread on which the previous operator executed.
Unless specified, the topmost operator (the source) itself runs on the Thread in which thesubscribe()
call was made.
So without any meddling and with using just the "basic" operators like map()
, flatMap()
or filter()
,
the final subscribe()
runs on the calling thread.
Some operators use a specific scheduler from
Schedulers
by default (and usually give you the option of providing a different one). For instance, calling theFlux.interval(Duration.ofMillis(300))
factory method produces aFlux<Long>
that ticks every 300ms. By default, this is enabled bySchedulers.parallel()
.
So just by calling some operators the default thread is changed. It is always mentioned in the documentation for the method. E.g. for Flux.interval(Duration):
Runs on the
Schedulers.parallel()
Scheduler.
But you may change it if you wish:
The following line changes the Scheduler to a new instance similar to Schedulers.single():
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))
It seems like each Mono
and Flux
method which runs on a different scheduler has an overloaded alternative with an extra Scheduler
parameter.
Spring WebFlux documentation tells this:
What threads should you expect to see on a server running with Spring WebFlux?
On a “vanilla” Spring WebFlux server (for example, no data access or other optional dependencies), you can expect one thread for the server and several others for request processing (typically as many as the number of CPU cores). Servlet containers, however, may start with more threads (for example, 10 on Tomcat), in support of both servlet (blocking) I/O and servlet 3.1 (non-blocking) I/O usage.
The reactive WebClient operates in event loop style. So you can see a small, fixed number of processing threads related to that (for example, reactor-http-nio- with the Reactor Netty connector). However, if Reactor Netty is used for both client and server, the two share event loop resources by default.
Reactor and RxJava provide thread pool abstractions, called schedulers, to use with the
publishOn
operator that is used to switch processing to a different thread pool. The schedulers have names that suggest a specific concurrency strategy — for example, “parallel” (for CPU-bound work with a limited number of threads) or “elastic” (for I/O-bound work with a large number of threads). If you see such threads, it means some code is using a specific thread pool Scheduler strategy.Data access libraries and other third party dependencies can also create and use threads of their own.
© 2022 - 2024 — McMap. All rights reserved.