Execution time reactive programming
Asked Answered
S

2

9

Is this an ideal way to find execution time of method (getFavouriteDetails()), in reactive programming ?

public List<Favourites> getFavouriteDetails(String userId){
    userService.getFavorites(userId) 
               .flatMap(favoriteService::getDetails) 
               .switchIfEmpty(suggestionService.getSuggestions()) 
               .take(5) 
               .publishOn(UiUtils.uiThreadScheduler()) 
               .subscribe(uiList::show, UiUtils::errorPopup)
               .flatMap(a -> Mono.subscriberContext().map(ctx -> {
                         log.info("Time taken : " + Duration.between(ctx.get(key), Instant.now()).toMillis() + " milliseconds.");
                         return a;
                     }))
               .subscriberContext(ctx -> ctx.put(key, Instant.now()))
}
Subjunctive answered 5/7, 2018 at 21:4 Comment(6)
What RxJava version are you using. That won't compile under RxJava 1.Brooch
I have written code using reactorSubjunctive
Can you use timeInterval: reactivex.io/documentation/operators/timeinterval.html?Chalone
"execution time of method" Which method?Quadrennium
of getFavouriteDetailsSubjunctive
your code cannot compile as it is using flatMap after a subscribe...Wisniewski
I
8

Two approaches to ensure that you only measure execution time when you subscribe -

  • Wrap a Mono around the Flux using flatMapMany. This returns a Flux as well.
  • Use an AtomicReference, set time in onSubscribe and log elapsed time in doFinally.

Sample code -

timeFluxV1(getFavouriteDetails(userId)).subscribe(uiList::show, UiUtils::errorPopup);

timeFluxV1(getFavouriteDetails(userId)).subscribe(uiList::show, UiUtils::errorPopup);
 
private <T>  Flux<T> timeFluxV1(Flux<T> flux) {
    return Mono.fromSupplier(System::nanoTime)
             .flatMapMany(time -> flux.doFinally(sig -> log.info("Time taken : " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - time) + " milliseconds.")));
}


private <T>  Flux<T> timeFluxV2(Flux<T> flux) {
    AtomicReference<Long> startTime = new AtomicReference<>();
    return flux.doOnSubscribe(x -> startTime.set(System.nanoTime()))
            .doFinally(x -> log.info("Time taken : " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime.get()) + " milliseconds."));
}

public Flux<Favourites> getFavouriteDetails(String userId) {
    return userService.getFavorites(userId)
               .flatMap(favoriteService::getDetails) 
               .switchIfEmpty(suggestionService.getSuggestions()) 
               .take(5) 
               .publishOn(UiUtils.uiThreadScheduler());
}
Inspirational answered 25/6, 2020 at 10:51 Comment(0)
D
6

To time a method, the most basic way in Java is to use long System.nanoTime(). Instant and System.currentTimeMillis are for wall-clock operations and are not guaranteed to be monotonous nor precise enough...

In Reactor, to measure the time a sequence takes to complete, you would usually need to start the timing on subscription (nothing happens until you subscribe) and stop the timing within a doFinally (which execute some code on the side of the main sequence whenever it completes, errors or is cancelled).

Here however you are subscribing yourself, so there is no risk to be multiple subscriptions. You can thus do away with the "start timing on subscription" constraint.

It gives us something like this:

public List<Favourites> getFavouriteDetails(String userId){
    final long start = System.nanoTime();
    userService.getFavorites(userId) 
               .flatMap(favoriteService::getDetails) 
               .switchIfEmpty(suggestionService.getSuggestions()) 
               .take(5) 
               .publishOn(UiUtils.uiThreadScheduler())
               .doFinally(endType -> log.info("Time taken : " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start) + " milliseconds."))
               .subscribe(uiList::show, UiUtils::errorPopup);
    //return needed!
}

Note that there is also a elapsed() operator, which measures the time between subscription and 1st onNext, then between subsequent onNexts. It outputs a Flux<Tuple2<Long, T>>, and you could aggregate the longs to get overall timing, but that would lose you the "realtime" nature of Ts in that case.

Dworman answered 9/7, 2018 at 7:56 Comment(2)
FlatMap will be invoked 5 times, as you take(5)... This is pretty much equivalent to doing elapsed() with a bit more overhead. For your use case you're fishing with a grenade launcher :DWisniewski
The approach is bad, as nanoTime() of start will be executed on "design-time" of the flux, while toMillis will be executed on "run-time" of the flux. It only works in this case, because subscribe is right where it is standing here.Durr

© 2022 - 2024 — McMap. All rights reserved.