How to use Micrometer Timer to record duration of async method (returns Mono or Flux)
Asked Answered
I

7

14

I'd like to use Micrometer to record the execution time of an async method when it eventually happens. Is there a recommended way to do this?

Example: Kafka Replying Template. I want to record the time it takes to actually execute the sendAndReceive call (sends a message on a request topic and receives a response on a reply topic).

    public Mono<String> sendRequest(Mono<String> request) {
        return request
            .map(r -> new ProducerRecord<String, String>(requestsTopic, r))
            .map(pr -> {
                pr.headers()
                        .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
                                "reply-topic".getBytes()));
                return pr;
            })
            .map(pr -> replyingKafkaTemplate.sendAndReceive(pr))
            ... // further maps, filters, etc.

Something like

responseGenerationTimer.record(() -> replyingKafkaTemplate.sendAndReceive(pr)))

won't work here; it just records the time that it takes to create the Supplier, not the actual execution time.

Illusionary answered 16/3, 2018 at 0:35 Comment(1)
How does it work with the recordCallable() ?Murchison
B
3

You could use reactor.util.context.Context

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static org.hamcrest.Matchers.is;

public class TestMonoTimer {
    private static final Logger LOG = LoggerFactory.getLogger(TestMonoTimer.class);

    private static final String TIMER_SAMPLE = "TIMER_SAMPLE";
    private static final Timer TIMER = new SimpleMeterRegistry().timer("test");
    private static final AtomicBoolean EXECUTION_FLAG = new AtomicBoolean();

    @Test
    public void testMonoTimer() {
        Mono.fromCallable(() -> {
            Thread.sleep(1234);
            return true;
        }).transform(timerTransformer(TIMER))
                .subscribeOn(Schedulers.parallel())
                .subscribe(EXECUTION_FLAG::set);

        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAtomic(EXECUTION_FLAG, is(true));
        Assert.assertTrue(TIMER.totalTime(TimeUnit.SECONDS) > 1);
    }

    private static <T> Function<Mono<T>, Publisher<T>> timerTransformer(Timer timer) {
        return mono -> mono
                .flatMap(t -> Mono.subscriberContext()
                        .flatMap(context -> Mono.just(context.<Timer.Sample>get(TIMER_SAMPLE).stop(timer))
                                .doOnNext(duration -> LOG.info("Execution time is [{}] seconds",
                                        duration / 1000000000D))
                                .map(ignored -> t)))
                .subscriberContext(context -> context.put(TIMER_SAMPLE, Timer.start(Clock.SYSTEM)));
    }
}
Bless answered 23/8, 2018 at 11:40 Comment(0)
R
5

You can just metrics() from Mono/Flux() (have a look at metrics() here: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html) then you can do something like

public Mono<String> sendRequest(Mono<String> request) {
    return request
        .map(r -> new ProducerRecord<String, String>(requestsTopic, r))
        .map(pr -> {
            pr.headers()
                    .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
                            "reply-topic".getBytes()));
            return pr;
        })
        .map(pr -> replyingKafkaTemplate.sendAndReceive(pr)).name("my-metricsname").metrics()

And e.g. in graphite you will see latency for this call measured (You can see more here: How to use Micrometer timer together with webflux endpoints)

Remuneration answered 10/5, 2019 at 11:4 Comment(0)
B
3

You could use reactor.util.context.Context

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static org.hamcrest.Matchers.is;

public class TestMonoTimer {
    private static final Logger LOG = LoggerFactory.getLogger(TestMonoTimer.class);

    private static final String TIMER_SAMPLE = "TIMER_SAMPLE";
    private static final Timer TIMER = new SimpleMeterRegistry().timer("test");
    private static final AtomicBoolean EXECUTION_FLAG = new AtomicBoolean();

    @Test
    public void testMonoTimer() {
        Mono.fromCallable(() -> {
            Thread.sleep(1234);
            return true;
        }).transform(timerTransformer(TIMER))
                .subscribeOn(Schedulers.parallel())
                .subscribe(EXECUTION_FLAG::set);

        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAtomic(EXECUTION_FLAG, is(true));
        Assert.assertTrue(TIMER.totalTime(TimeUnit.SECONDS) > 1);
    }

    private static <T> Function<Mono<T>, Publisher<T>> timerTransformer(Timer timer) {
        return mono -> mono
                .flatMap(t -> Mono.subscriberContext()
                        .flatMap(context -> Mono.just(context.<Timer.Sample>get(TIMER_SAMPLE).stop(timer))
                                .doOnNext(duration -> LOG.info("Execution time is [{}] seconds",
                                        duration / 1000000000D))
                                .map(ignored -> t)))
                .subscriberContext(context -> context.put(TIMER_SAMPLE, Timer.start(Clock.SYSTEM)));
    }
}
Bless answered 23/8, 2018 at 11:40 Comment(0)
S
1

You could do something like the following:

// Mono<Something> mono = ...
Timer.Sample sample = Timer.start(Clock.SYSTEM); // or use clock of registry
return mono.doOnNext(x -> sample.stop(timer));

See here for Sample documentation: http://micrometer.io/docs/concepts#_storing_start_state_in_code_timer_sample_code

For a nicer approach you could also have a look at resilience4j they decorate the mono via transform: https://github.com/resilience4j/resilience4j/tree/master/resilience4j-reactor

Scheldt answered 23/6, 2018 at 18:28 Comment(0)
P
0

I used the following:

private <T> Publisher<T> time(String metricName, Flux<T> publisher) {
  return Flux.defer(() -> {

  long before = System.currentTimeMillis();
  return publisher.doOnNext(next -> Metrics.timer(metricName)
        .record(System.currentTimeMillis() - before, TimeUnit.MILLISECONDS));
  });
}

So to use it in practice:

Flux.just(someValue)
  .flatMap(val -> time("myMetricName", aTaskThatNeedsTimed(val))
  .subscribe(val -> {})
Patrol answered 24/10, 2018 at 10:41 Comment(0)
G
0

you can make use of metrics() ,method that calculates the time interval b/w subscribe() and onComplete(). you can do like,

 .metrics().elapsed().doOnNext(tuple -> log.info("get response time: " + tuple.getT1() + "ms")).map(Tuple2::getT2);
Glycoprotein answered 21/5, 2020 at 7:8 Comment(0)
D
0

If you consider use metrics(), please do understand it won't create a new Meter even if you invoke Mono.name().

Dependning on your situtaion, you have three choice.

  1. Using metrics()
    • Well, If you consider use metrics(), please do understand it won't create a new Meter even if you invoke Mono.name().
  2. Record the time in doOnNext and do your time calculation.
  3. Use subscriptionContext as imposed by Alexander Pankin

Personally, I'd like to use approach 3.

Diffuse answered 4/6, 2020 at 5:49 Comment(0)
I
-1

It looks like recordCallable as suggested by Brian Clozel is the answer. I wrote a quick test to verify this:

import io.micrometer.core.instrument.Timer;
import reactor.core.publisher.Mono;

public class Capitalizer {

    private final Timer timer;

    public Capitalizer(Timer timer) {
        this.timer = timer;
    }

    public Mono<String> capitalize(Mono<String> val) {
        return val.flatMap(v -> {
            try {
                return timer.recordCallable(() -> toUpperCase(v));
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
        }).filter(r -> r != null);
    }

    private Mono<String> toUpperCase(String val) throws InterruptedException {
        Thread.sleep(1000);
        return Mono.just(val.toUpperCase());
    }
}

and to test this:

import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.util.concurrent.TimeUnit;

import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;

public class CapitalizerTest {

    private static final Logger logger =
        LoggerFactory.getLogger(CapitalizerTest.class);

    private Capitalizer capitalizer;
    private Timer timer;

    @Before
    public void setUp() {
        timer = new SimpleMeterRegistry().timer("test");
        capitalizer = new Capitalizer(timer);
    }

    @Test
    public void testCapitalize() {
        String val = "Foo";
        Mono<String> inputMono = Mono.just(val);
        Mono<String> mono = capitalizer.capitalize(inputMono);
        mono.subscribe(v -> logger.info("Capitalized {} to {}", val, v));
        assertEquals(1, timer.count());
        logger.info("Timer executed in {} ms",
            timer.totalTime(TimeUnit.MILLISECONDS));
        assertTrue(timer.totalTime(TimeUnit.MILLISECONDS) > 1000);
    }
}

The timer reports that the execution time is roughly 1004ms with the 1000ms delay, and 4ms without it.

Illusionary answered 16/3, 2018 at 23:35 Comment(3)
why does recordCallable throw a checked but generic Exception?Affinitive
The test only 'works' because Thread.sleep(1000); blocks. The timer does still not record the duration of the mono that get's returned.Scheldt
timer.recordCallable only counts the time spent in exeuction toUpperCase, not the execution time of Mono.Diffuse

© 2022 - 2024 — McMap. All rights reserved.