While Project Reactor is a great tool, it's important to keep in mind that not all tasks need to be solved via Project Reactor. In my opinion, sometimes using pure Java might be a less complex and better fit even if Project Reactor is the main tool in your app stack. But since I don't know your real case I will share both versions.
Option 1: Less Reactor, more pure Java.
public class ScheduledTask {
private final Scheduler scheduler = Schedulers.newSingle("custom-scheduled-task");
private final Instant startTime = Instant.now();
{
scheduler.schedule(this::task); // if you don't want delay on start
//scheduler.schedule(this::task, 10, TimeUnit.SECONDS); // if you want delay on start
}
private void task() {
System.out.println("do something"); // you can inject your service to the class and execute needed method here
if (shouldContinue()) {
scheduler.schedule(this::task, calculateDelaySec(), TimeUnit.SECONDS);
}
}
private boolean shouldContinue() {
return true; // put your custom logic here
}
private long calculateDelaySec() {
var minutesSpentSinceStart = Duration.between(startTime, Instant.now()).toMinutes();
if (minutesSpentSinceStart < 5) return 10;
if (minutesSpentSinceStart < 15) return 30;
return 60;
}
}
Option 2: Reactor-way
This approach is pretty close to the @VonC recommendation, I've modified the generate
method to make it more suitable according to requirements.
public class ScheduledTask2 {
private record FluxArgs(Instant startTime, Boolean calculateDelay) {}
private static Flux<String> task() {
return Flux.<Duration, FluxArgs>generate(
// use start time as initial value and boolean marker to skip start delay
() -> new FluxArgs(Instant.now(), false),
(args, sink) -> {
if (args.calculateDelay) {
sink.next(Duration.ofSeconds(calculateDelaySec(args.startTime)));
} else {
sink.next(Duration.ZERO);
}
// pass start time as argument for next iteration and calculateDelay:true to calculate correct delay
return new FluxArgs(args.startTime, true);
}
)
.delayUntil(Mono::delay)
.concatMap(ignore -> doSomething()) // concatMap is used to make sure that doSomething is not executed in parallel
.takeUntil(ScheduledTask2::shouldStop);
}
private static long calculateDelaySec(Instant startTime) {
var minutesSpentSinceStart = Duration.between(startTime, Instant.now()).toMinutes();
if (minutesSpentSinceStart < 5) return 10;
if (minutesSpentSinceStart < 15) return 30;
return 60;
}
private static Mono<String> doSomething() {
return Mono.just("");
}
private static boolean shouldStop(String val) {
return true; // put your custom logic here
}
}
Option 3: Mix.
You can use this option if you need to have the results of your task execution as a Flux to propagate them further. As for me, this option looks less complex than option 2 but allows you to utilize results as a Flux stream.
Please remember that depending on your needs, you can use different Sinks
APIs like multicast()
or unicast()
here.
public class ScheduledTask3 {
private final Scheduler scheduler = Schedulers.newSingle("custom-scheduled-task");
private final Instant startTime = Instant.now();
private final Sinks.Many<String> sink = Sinks.many().replay().all();
{
scheduler.schedule(this::task); // if you don't want delay on start
//scheduler.schedule(this::task, 10, TimeUnit.SECONDS); // if you want delay on start
}
public Flux<String> taskResults() {
return sink.asFlux();
}
private void task() {
var result = doSomething();
sink.tryEmitNext(result);
if (shouldContinue()) {
scheduler.schedule(this::task, calculateDelaySec(), TimeUnit.SECONDS);
}
}
private String doSomething() {
return "";
}
private boolean shouldContinue() {
return true; // put your custom logic here
}
private long calculateDelaySec() {
var minutesSpentSinceStart = Duration.between(startTime, Instant.now()).toMinutes();
if (minutesSpentSinceStart < 5) return 10;
if (minutesSpentSinceStart < 15) return 30;
return 60;
}
}
Meaning, for the first 5 minutes, the delay can be 10 seconds between each execution
. If I understand correctly, there are multiple executions in first 5 minutes, and they all delay 10s. Is it? – Japhetic