I'm searching a way to use scheduled tasks in a reactive API. I know that it uses the thread pool so it's not very compatible with the webflux components.
Do you have an equivalent to do the job?
I'm searching a way to use scheduled tasks in a reactive API. I know that it uses the thread pool so it's not very compatible with the webflux components.
Do you have an equivalent to do the job?
You may try to use Schedulers.immediate()
inside @Scheduled
method:
doWork()
.subscribeOn(Schedulers.immediate())
.subscribe()
As a consequence tasks run on the thread that submitted them.
There are several ways to do. Considering how you want to schedule it you can use following as well.
@Configuration
class ApplicationConfiguration() {
@PostConstruct
fun init() {
Flux.interval(Duration.ofMinutes(12))
.onBackpressureDrop()
.flatMap { /* some task that return Mono<T> */ }
.subscribeOn(Schedulers.boundedElastic())
.subscribe()
}
}
Please note subscribeOn(Schedulers.boundedElastic())
is not required unless the call is blocking. Also I am using onBackpressureDrop
but your requirements may be different than that.
Webflux has it's own scheduler, and I gues this way should do it:
Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
@Scheduled
is not a good choice for reactive tasks if within that reactive chain would be a thread switching. That means your schedule thread will be released immediately and if your task takes a long time, more than the interval between scheduled executions, you'll get an overlap of executions.
Imagine, your @Scheduled
runs every minute. You have some reactive chain that, for example, makes some calls to an external API with WebClient. Then your execution will be switched to the thread from WebClient pool and your scheduled-n
thread will be released immediately. If your API calls takes more than one minute, the @Scheduled method will be launched again after one minute. So you'll get an overlap of your executions which may lead to some problems (depending on your business logic).
So, so fix that you could use the following consruct instead of @Scheduled
annotation:
@Bean
public Disposable someTaskScheduler() {
return Flux.interval(Duration.ofMinutes(1)) // run every minute
.publishOn(Schedulers.boundedElastic())
.onBackpressureDrop() // if the task below takes a long time, greater than the next tick, then just drop this tick
.concatMap(__ -> Mono.defer(() -> {
// process your tasks below
log.info("Trying to process the task...");
return Mono.just("completed");
}), 0)
.subscribe();
}
If your case is more complicated, for example, you want to use some cron expression, then you would need some workaround. You need to parse the cron expression and find two things:
The Flux.interval()
has the method
Flux<Long> interval(Duration delay, Duration period)
It has the following description:
Create a Flux that emits long values starting with 0 and incrementing at specified time intervals, after an initial delay, on the global timer. If demand is not produced in time, an onError will be signalled with an overflow IllegalStateException detailing the tick that couldn't be emitted. In normal conditions, the Flux will never complete. Runs on the Schedulers.parallel() Scheduler.
So, you would do it like this:
@Bean
public Disposable someTaskScheduler() {
String cron = "0 0 * * * *"; // execute a task every hour (can be moved to properties)
Pair<Duration, Duration> durations = computeDurations(cron, LocalDateTime.now());
return Flux.interval(durations.getFirst(), durations.getSecond()) // specify init delay and interval between executions
.publishOn(Schedulers.boundedElastic())
.onBackpressureDrop() // if the task below takes a long time, greater than the next tick, then just drop this tick
.concatMap(__ -> Mono.defer(() -> {
// process your tasks below
log.info("Trying to process the task...");
return Mono.just("completed");
}), 0)
.subscribe();
/**
* Compute init delay and interval between executions from cron expression and current time
*
* @param cron cron expression to parse
* @param currentTime current time
* @return pair where left is an initial delay and right is an interval between executions
*/
public Pair<Duration, Duration> computeDurations(String cron, LocalDateTime currentTime) {
CronExpression cronExpression = CronExpression.parse(cron);
LocalDateTime initExecutionTime = cronExpression.next(currentTime);
LocalDateTime nextExecutionTime = cronExpression.next(initExecutionTime);
log.info("Init execution time: {}", initExecutionTime);
log.info("Next execution time: {}", nextExecutionTime);
Duration initDelay = Duration.between(currentTime, initExecutionTime);
Duration interval = Duration.between(initExecutionTime, nextExecutionTime);
return Pair.of(initDelay, interval);
}
© 2022 - 2024 — McMap. All rights reserved.