Java reactor - chain Mono<Void> with another async task producing Mono<Object>
Asked Answered
E

2

7

I have the below async tasks:

public class AsyncValidationTask {
    // Returns Mono.error(new Exception()) if error, otherwise Mono.empty()
    public Mono<Void> execute(Object o);
}
public class AsyncSaveTask {
    // Returns Mono.error(new Exception()) if error, otherwise Mono of Object
    public Mono<Object> execute(Object o);
}

And below Service class:

public class AsyncService {

    private AsyncValidationTask validation;

    private AsyncSaveTask save;

    public Mono<Object> validateAndSave(Object o) {
        return Mono.defer(() -> this.validation.execute(o))
                   // Right now, the problem is that when validation completes successfully, it 
                   // emits Mono.empty hence the flatMap chained below will not be invoked at all.
                   .flatMap(dontcare -> this.save.execute(o))
    }
}

As shown above, I tried to use flatMap to chain the AsyncSaveTask.execute call if the AsyncValidationTask.execute completes successfully, it won't work because nothing is emitted (Mono.empty) upon completion.

I also consider then to chain the second call, but it will always invoke the chained call regardless of the Mono.error produced by the first validation call.

How can I chain them properly?

Eunuchize answered 24/3, 2020 at 17:42 Comment(0)
B
17

.then for terminal only sources to chain

Use .then, in order to chain your execution with the process, which only sends a terminal signal.

Also, pay attention, if you need to do something on the error signal, then you have to accompany your .then with onErrorResume beforehand.

public class AsyncService {

    private AsyncValidationTask validation;

    private AsyncSaveTask save;

    public Mono<Object> validateAndSave(Object o) {
        return Mono.defer(() -> this.validation.execute(o))
                   .onErrorResume(t -> ...) // should be before then
                   .then(this.save.execute(o))
    }
}

.defer in order to postpone mono creation

In order to execute the this.save.execute(o) only in case of successful validation, you have to wrap it in Mono.defer as well:

public class AsyncService {

    private AsyncValidationTask validation;

    private AsyncSaveTask save;

    public Mono<Object> validateAndSave(Object o) {
        return Mono.defer(() -> this.validation.execute(o))
                   .onErrorResume(t -> ...) // should be before then
                   .then(Mono.defer(() -> this.save.execute(o)))
    }
}

Usually, it is not necessary, because Mono is a LAZY type which SHOULD start doing work only in case subscription happened (subscription == .subscribe()).

The implementation of Mono#then guarantees that subscription to Mono returned by the this.save.execute the method starts RIGHT AFTER the Mono.defer(() -> this.validation.execute(o)) completed.

The only reason why execution may start earlier may be a PURPOSE (e.g., business logic which provides such behavior on purpose - caching; hot source, etc.) OR an INCORRECT implementation of the this.save.execute(o) which starts doing work regardless of actual subscription.

Design your Implementation properly

In general, it is a good practice to ensure that API which does work and expose that work as a Publisher (e.g. Mono | Flux) is Lazy.

It means that the API creator MUST ensure that the work execution happens only in case the user has subscribed to the given Publisher instance.

For example, if your async API does CompletableFuture creation underneath, it worth to manually wrap your CompletableFuture creation into Mono.defer or to use proper method extension, e.g Mono.fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier)

Executors example

Let's consider how to make a regular ThreadPool task submission Reactive.

interface Executor  {
  Future<T> execute(Callable<T> runnable); 
}

So, in order to make Executor reactive, we have to create something like the following:

interface ReactiveExecutor {
   Mono<T> execute(Callable<T> runnable);
}

Incorrect Implementation

The following is a possible implementation of such an adapter which works:

class ReactiveExecutorAdapter {
   final Executor delegate;

   ...


   Mono<T> execute(Callable<T> runnable) {
      MonoProcessor<T> result = MonoProcessor.create();
      Future<T> task = delegate.execute(() -> {
          T value = runnable.call();
          result.onNext(value);
          result.onComplet();
          return value;
      });

      return result.doOnCancel(() -> task.cancel());
   }
}

Definitely, such an implementation will be working. However, it has a few critical issues:

  1. Execution starts on the method invocation (which somewhat contradicts to lazy behavior of reactive streams Publisher)
  2. Since execution starts before the actual task subscription, we have to create a stateful Mono, which supports later subscription.
  3. This implementation does not handle the case when there are no Subscribers at all (e.g., execution has started, but no .subscribe method happened (then we got value leak which impossible to handle)
  4. It is too hacky in general to be a solution. Also, in order to prevent all the previously mentioned cases, it is necessary to wrap every call on Mono execute(..) with Mono.defer outside of the implementation (see the original problem in the question). Subsequently, it leads to the fact that an API user can easily 'shoot your self in the foot' forgetting to wrap execution with an extra .defer

So, how to solve it, then?

Basically, it is enough to move the Mono.defer into the library internals. It will make the life of the API users much easier since they don't have to think when it is necessary to use deferring (hence - less possible issues).

For example, the simplest solution for our Reactive Executor can be the following:

class ReactiveExecutorAdapter {
   final Executor delegate;

   ...


   Mono<T> execute(Callable<T> runnable) {
      Mono.defer(() -> {
          MonoProcessor<T> result = MonoProcessor.create();
          Future<T> task = delegate.execute(() -> {
              T value = runnable.call();
              result.onNext(value);
              result.onComplet();
              return value;
          });

          return result.doOnCancel(() -> task.cancel());
     })
   }
}

By just deferring the execution, we can solve at least one problem for sure - ensure value is not leaked anymore.

But, How to solve it correctly?

However, in order to solve all the possible problems in that particular case, we may use Mono.create which is properly designed for adopting async API:

class ReactiveExecutorAdapter {
   final Executor delegate;

   ...


   Mono<T> execute(Callable<T> runnable) {
      Mono.crete(monoSink -> {

          Future<T> task = delegate.execute(() -> {
              T value = runnable.call();
              monoSink.complete(value);
          });

          monoSink.doOnCancel(task::cancel);
     })
   }
}

using Mono.create we have a guarantee of lazy execution on every subscriber. Also, using MonoSink API, we can quickly hook on all the essential signals from the subscriber. Finally, Mono.create ensures that in case of anything, the value will be discarded appropriately.

Finally, having such an API it is not necessary to use defer for all the cases

Broody answered 24/3, 2020 at 17:53 Comment(4)
Can I just do onErrorStop instead of onErrorResume and chain the second call without Mono.defer?Eunuchize
It does not work like that. onErrorStop is a way to stop onErrorContinue operator. It is a specification violation if a terminal signal is swallowed in the middle of the chain. just added a couple of notes on how to design API which exposes reactive streamsBroody
I will accept this as the answer and use this as reference when I read more reactor documents. Thanks!Eunuchize
Added an example of proper implementation on top of async taskBroody
L
0

Not sure if I understood the question correctly, but.. Looking at the signatures of methods in OP's question, they don't really look like "async tasks" for an ExecutorService, they look like mere Mono-producing methods, something that routinely comes across in 'reactive' world. then the question is just in combining them with something like

Mono<Object> validateAndSave(Object o) {
    return validation.execute(o)
               .then(save.execute(o));

.then will ignore elements emitted by the source (i.e.validation.execute), but will not ignore error signal. so in case of onError your method will return Mono.error(), otherwise it will return what save.execute returned.

Linnlinnaeus answered 25/3, 2020 at 7:25 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.