Spring Reactor: adding delay but in an NON blocking way
Asked Answered
K

1

5

Small question on how to add delay in a method but in a non blocking way please.

A very popular way to simulate long processes is to use Thread.sleep(); However, for project Reactor, this is a blocking operation. And it is well known, in a reactive project, we should not block.

I would like to experiment and simulate long processes. Some sort of method which will take a lot of time, but in a NON blocking way, WITHOUT swapping thread. This is to simulate a method that is just vey lengthy, but proven NON blocking by BlockHound etc.

This construct is very popular:

@Test
    public void simulateLengthyProcessingOperationReactor() {
        Flux.range(1,5000)
                .map(a -> simulateLengthyProcessingOperation(a))
                .subscribe(System.out::println);
    }

    public String simulateLengthyProcessingOperation(Integer input) {
        simulateDelayBLOCKING();
        return String.format("[%d] on thread [%s] at time [%s]", input, Thread.currentThread().getName(), new Date());
    }

    public void simulateDelayBLOCKING() {
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

But it is blocking. (I know there is the Mono.fromCallable(() -> but this is not the question)

Is it possible to do the same, simulate delay, but NON blocking please? Also, .delay will not achieve the expected result (simulating a NON blocking lengthy method on the same reactive pipeline)

@Test
    public void simulateLengthyProcessingOperationReactor() {
        Flux.range(1,5000)
                .map(a -> simulateLengthyProcessingOperation(a))
                .subscribe(System.out::println);
    }

    public String simulateLengthyProcessingOperation(Integer input) {
        simulateDelay_NON_blocking();
        return String.format("[%d] on thread [%s] at time [%s]", input, Thread.currentThread().getName(), new Date());
    }

    public void simulateDelay_NON_blocking() {
        //simulate lengthy process, but WITHOUT blocking
    }

Thank you

Kipper answered 30/1, 2023 at 13:19 Comment(2)
WHy, what is it you want to test? Shouldn't you be using the test utilities for reactive programming for this (provided by project reactor)? Something like projectreactor.io/docs/core/release/reference/…Bludgeon
What do you mean by "lengthy processing operation" ? If you want to simulate a CPU intensive task, then Thread.sleep is the way to go, because they have roughly the same consequence: they monopolize a thread, without any chance for another task to run on it. Replacing it with a non-blocking delay would not simulate properly a task doing heavy computation.Tracay
R
6

Of course you can, there is a family of methods .delay...()

You can for example read about delayElements() method here: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#delayElements-java.time.Duration-

You should know that it switches the executing thread to another Scheduler. Signals are delayed and continue on the parallel default Scheduler.

In simplest case it would look like this:

public void simulateLengthyProcessingOperationReactor() {
    Flux.range(1,5000)
            .delayElements(Duration.ofMillis(1000L)) // delay each element for 1000 millis
            .subscribe(System.out::println);
}

According to your case you could write your code like this:

@Test
public void simulateLengthyProcessingOperationReactor() {
    Flux.range(1,5000)
            .concatMap(this::simulateDelay_NON_blocking)
            .subscribe(System.out::println);
}

public Mono<String> simulateDelay_NON_blocking(Integer input) {
    //simulate lengthy process, but WITHOUT blocking
    return Mono.delay(Duration.ofMillis(1000L))
            .map(__ -> String.format("[%d] on thread [%s] at time [%s]",
                    input, Thread.currentThread().getName(), new Date()));
}
Radiosurgery answered 30/1, 2023 at 13:27 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.