Increasing delays in Java Flux
Asked Answered
U

1

6

I have a code where I can execute a piece of logic in loop with some delays in between using Flux. Something like this,

Flux.defer(() -> service.doSomething())
            .repeatWhen(v -> Flux.interval(Duration.ofSeconds(10)))
            .map(data -> mapper(data)) //map data
            .takeUntil(v -> shouldContinue(v)) //checks if the loop can be terminated
            .onErrorStop();

Now, I want to have incremental delays. Meaning, for the first 5 minutes, the delay can be 10 seconds between each execution. Then, for next 10 minutes, the delay can be 30 seconds. And, thereafter the delay can be a min between each execution.

How can I achieve this using Flux?

Thanks in advance.

Unjust answered 3/11, 2023 at 18:38 Comment(3)
Meaning, for the first 5 minutes, the delay can be 10 seconds between each execution. If I understand correctly, there are multiple executions in first 5 minutes, and they all delay 10s. Is it?Japhetic
Do you want to consider the 5 minute time in to flux for increment? or just the delay time only?Bagpipe
Are 5 mins, 10 mins completion time for the flux?Bagpipe
D
2

While Project Reactor is a great tool, it's important to keep in mind that not all tasks need to be solved via Project Reactor. In my opinion, sometimes using pure Java might be a less complex and better fit even if Project Reactor is the main tool in your app stack. But since I don't know your real case I will share both versions.

Option 1: Less Reactor, more pure Java.

public class ScheduledTask {
    private final Scheduler scheduler = Schedulers.newSingle("custom-scheduled-task");
    private final Instant startTime = Instant.now();

    {
        scheduler.schedule(this::task); // if you don't want delay on start 
        //scheduler.schedule(this::task, 10, TimeUnit.SECONDS); // if you want delay on start
    }

    private void task() {
        System.out.println("do something"); // you can inject your service to the class and execute needed method here

        if (shouldContinue()) {
            scheduler.schedule(this::task, calculateDelaySec(), TimeUnit.SECONDS);
        }
    }

    private boolean shouldContinue() {
        return true; // put your custom logic here
    }

    private long calculateDelaySec() {
        var minutesSpentSinceStart = Duration.between(startTime, Instant.now()).toMinutes();

        if (minutesSpentSinceStart < 5) return 10;
        if (minutesSpentSinceStart < 15) return 30;

        return 60;
    }
}

Option 2: Reactor-way

This approach is pretty close to the @VonC recommendation, I've modified the generate method to make it more suitable according to requirements.

public class ScheduledTask2 {

    private record FluxArgs(Instant startTime, Boolean calculateDelay) {}

    private static Flux<String> task() {
        return Flux.<Duration, FluxArgs>generate(
                // use start time as initial value and boolean marker to skip start delay
                () -> new FluxArgs(Instant.now(), false),
                (args, sink) -> {
                    if (args.calculateDelay) {
                        sink.next(Duration.ofSeconds(calculateDelaySec(args.startTime)));
                    } else {
                        sink.next(Duration.ZERO);
                    }
                    // pass start time as argument for next iteration and calculateDelay:true to calculate correct delay
                    return new FluxArgs(args.startTime, true);
                }
            )
            .delayUntil(Mono::delay)
            .concatMap(ignore -> doSomething()) // concatMap is used to make sure that doSomething is not executed in parallel
            .takeUntil(ScheduledTask2::shouldStop);
    }

    private static long calculateDelaySec(Instant startTime) {
        var minutesSpentSinceStart = Duration.between(startTime, Instant.now()).toMinutes();

        if (minutesSpentSinceStart < 5) return 10;
        if (minutesSpentSinceStart < 15) return 30;

        return 60;
    }

    private static Mono<String> doSomething() {
        return Mono.just("");
    }

    private static boolean shouldStop(String val) {
        return true; // put your custom logic here
    }
}

Option 3: Mix.

You can use this option if you need to have the results of your task execution as a Flux to propagate them further. As for me, this option looks less complex than option 2 but allows you to utilize results as a Flux stream.

Please remember that depending on your needs, you can use different Sinks APIs like multicast() or unicast() here.

public class ScheduledTask3 {

    private final Scheduler scheduler = Schedulers.newSingle("custom-scheduled-task");
    private final Instant startTime = Instant.now();
    private final Sinks.Many<String> sink = Sinks.many().replay().all();

    {
        scheduler.schedule(this::task); // if you don't want delay on start 
        //scheduler.schedule(this::task, 10, TimeUnit.SECONDS); // if you want delay on start
    }

    public Flux<String> taskResults() {
        return sink.asFlux();
    }

    private void task() {
        var result = doSomething();
        sink.tryEmitNext(result);

        if (shouldContinue()) {
            scheduler.schedule(this::task, calculateDelaySec(), TimeUnit.SECONDS);
        }
    }

    private String doSomething() {
        return "";
    }

    private boolean shouldContinue() {
        return true; // put your custom logic here
    }

    private long calculateDelaySec() {
        var minutesSpentSinceStart = Duration.between(startTime, Instant.now()).toMinutes();

        if (minutesSpentSinceStart < 5) return 10;
        if (minutesSpentSinceStart < 15) return 30;

        return 60;
    }
}
Dolor answered 6/11, 2023 at 14:37 Comment(2)
Thanks for this solution. Similar to @VonC recommendation, few follow up thoughts. what happens if the execution time of doSomething() is more than that of the delay? This might trigger doSomething() multiple times in parallel. Also, if in case i want to start with a delay of 0 Seconds and then slowly increase the delay, how can I achieve that ?Unjust
@PraveenKumar for "Option 1" and "Option 3": doSomething() will not be triggered in parallel. To remove the initial delay on start, you can remove the delay in schedule method in the initializer block. As for "Option 2" it is possible that doSomething() can run in parallel, to fix this you need to replace flatMap with concatMap. As for the initial delay - to remove it you need to add the additional marker to the generate method. I've updated all options in my original post so they now match these requirements. Please check and let me know if you have questions or need help.Dolor

© 2022 - 2024 — McMap. All rights reserved.