Following is an excerpt from excellent blog post https://spring.io/blog/2019/12/13/flight-of-the-flux-3-hopping-threads-and-schedulers
publishOn
This is the basic operator you need when you want to hop threads. Incoming signals from its source are published on the given Scheduler, effectively switching threads to one of that scheduler’s workers.
This is valid for the onNext
, onComplete
and onError
signals. That is, signals that flow from an upstream source to a downstream subscriber.
So in essence, every processing step that appears below this operator will execute on the new Scheduler s, until another operator switches again (eg. another publishOn
).
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Flux.fromIterable(secondListOfUrls) //contains D and E
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
Output
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C
subscribeOn
This operator changes where the subscribe method is executed. And since the subscribe signal flows upward, it directly influences where the source Flux subscribes and starts generating data.
As a consequence, it can seem to act on the parts of the reactive chain of operators upward and downward (as long as there is no publishOn
thrown in the mix):
final Flux<String> fetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.map(url -> blockingWebClient.get(url));
}
// sample code:
fetchUrls(A, B, C)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
fetchUrls(D, E)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
Output
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C