.then
for terminal only sources to chain
Use .then
, in order to chain your execution with the process, which only sends a terminal signal.
Also, pay attention, if you need to do something on the error signal, then you have to accompany your .then
with onErrorResume
beforehand.
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
.onErrorResume(t -> ...) // should be before then
.then(this.save.execute(o))
}
}
.defer
in order to postpone mono creation
In order to execute the this.save.execute(o)
only in case of successful validation, you have to wrap it in Mono.defer
as well:
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
.onErrorResume(t -> ...) // should be before then
.then(Mono.defer(() -> this.save.execute(o)))
}
}
Usually, it is not necessary, because Mono
is a LAZY type which SHOULD start doing work only in case subscription happened (subscription == .subscribe()
).
The implementation of Mono#then
guarantees that subscription to Mono
returned by the this.save.execute
the method starts RIGHT AFTER the Mono.defer(() -> this.validation.execute(o))
completed.
The only reason why execution may start earlier may be a PURPOSE (e.g., business logic which provides such behavior on purpose - caching; hot source, etc.) OR an INCORRECT implementation of the this.save.execute(o)
which starts doing work regardless of actual subscription.
Design your Implementation properly
In general, it is a good practice to ensure that API which does work and expose that work as a Publisher
(e.g. Mono
| Flux
) is Lazy.
It means that the API creator MUST ensure that the work execution happens only in case the user has subscribed to the given Publisher
instance.
For example, if your async API does CompletableFuture
creation underneath, it worth to manually wrap your CompletableFuture
creation into Mono.defer
or to use proper method extension, e.g Mono.fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier)
Executors example
Let's consider how to make a regular ThreadPool task submission Reactive.
interface Executor {
Future<T> execute(Callable<T> runnable);
}
So, in order to make Executor
reactive, we have to create something like the following:
interface ReactiveExecutor {
Mono<T> execute(Callable<T> runnable);
}
Incorrect Implementation
The following is a possible implementation of such an adapter which works:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
MonoProcessor<T> result = MonoProcessor.create();
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
result.onNext(value);
result.onComplet();
return value;
});
return result.doOnCancel(() -> task.cancel());
}
}
Definitely, such an implementation will be working. However, it has a few critical issues:
- Execution starts on the method invocation (which somewhat contradicts to lazy behavior of reactive streams
Publisher
)
- Since execution starts before the actual task subscription, we have to create a stateful
Mono
, which supports later subscription.
- This implementation does not handle the case when there are no Subscribers at all (e.g., execution has started, but no
.subscribe
method happened (then we got value leak which impossible to handle)
- It is too hacky in general to be a solution. Also, in order to prevent all the previously mentioned cases, it is necessary to wrap every call on
Mono execute(..)
with Mono.defer
outside of the implementation (see the original problem in the question). Subsequently, it leads to the fact that an API user can easily 'shoot your self in the foot' forgetting to wrap execution with an extra .defer
So, how to solve it, then?
Basically, it is enough to move the Mono.defer
into the library internals. It will make the life of the API users much easier since they don't have to think when it is necessary to use deferring (hence - less possible issues).
For example, the simplest solution for our Reactive Executor can be the following:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
Mono.defer(() -> {
MonoProcessor<T> result = MonoProcessor.create();
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
result.onNext(value);
result.onComplet();
return value;
});
return result.doOnCancel(() -> task.cancel());
})
}
}
By just deferring the execution, we can solve at least one problem for sure - ensure value is not leaked anymore.
But, How to solve it correctly?
However, in order to solve all the possible problems in that particular case, we may use Mono.create
which is properly designed for adopting async API:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
Mono.crete(monoSink -> {
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
monoSink.complete(value);
});
monoSink.doOnCancel(task::cancel);
})
}
}
using Mono.create
we have a guarantee of lazy execution on every subscriber.
Also, using MonoSink
API, we can quickly hook on all the essential signals from the subscriber.
Finally, Mono.create ensures that in case of anything, the value will be discarded appropriately.
Finally, having such an API it is not necessary to use defer for all the cases
onErrorStop
instead ofonErrorResume
and chain the second call withoutMono.defer
? – Eunuchize