@Scheduled and Spring webflux
Asked Answered
P

4

11

I'm searching a way to use scheduled tasks in a reactive API. I know that it uses the thread pool so it's not very compatible with the webflux components.

Do you have an equivalent to do the job?

Pantelegraph answered 8/1, 2019 at 13:42 Comment(0)
F
3

You may try to use Schedulers.immediate() inside @Scheduled method:

doWork()
  .subscribeOn(Schedulers.immediate())
  .subscribe()

As a consequence tasks run on the thread that submitted them.

Flavine answered 10/8, 2021 at 12:15 Comment(0)
V
2

There are several ways to do. Considering how you want to schedule it you can use following as well.

@Configuration
class ApplicationConfiguration() {

    @PostConstruct
    fun init() {

        Flux.interval(Duration.ofMinutes(12))
            .onBackpressureDrop()
            .flatMap { /* some task that return Mono<T> */ }
            .subscribeOn(Schedulers.boundedElastic())
            .subscribe()
    }
}

Please note subscribeOn(Schedulers.boundedElastic()) is not required unless the call is blocking. Also I am using onBackpressureDrop but your requirements may be different than that.

Vindicable answered 10/8, 2021 at 22:39 Comment(0)
M
0

Webflux has it's own scheduler, and I gues this way should do it:

Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);  
Mattson answered 20/4, 2020 at 8:57 Comment(0)
D
0

@Scheduled is not a good choice for reactive tasks if within that reactive chain would be a thread switching. That means your schedule thread will be released immediately and if your task takes a long time, more than the interval between scheduled executions, you'll get an overlap of executions.

Imagine, your @Scheduled runs every minute. You have some reactive chain that, for example, makes some calls to an external API with WebClient. Then your execution will be switched to the thread from WebClient pool and your scheduled-n thread will be released immediately. If your API calls takes more than one minute, the @Scheduled method will be launched again after one minute. So you'll get an overlap of your executions which may lead to some problems (depending on your business logic).

So, so fix that you could use the following consruct instead of @Scheduled annotation:

@Bean
public Disposable someTaskScheduler() {
    return Flux.interval(Duration.ofMinutes(1)) // run every minute
            .publishOn(Schedulers.boundedElastic())
            .onBackpressureDrop() // if the task below takes a long time, greater than the next tick, then just drop this tick
            .concatMap(__ -> Mono.defer(() -> {
                // process your tasks below
                log.info("Trying to process the task...");
                return Mono.just("completed");
            }), 0)
            .subscribe();
}

If your case is more complicated, for example, you want to use some cron expression, then you would need some workaround. You need to parse the cron expression and find two things:

  1. The duration between current time and first execution
  2. The duration between each execution

The Flux.interval() has the method

Flux<Long> interval(Duration delay, Duration period)

It has the following description:

Create a Flux that emits long values starting with 0 and incrementing at specified time intervals, after an initial delay, on the global timer. If demand is not produced in time, an onError will be signalled with an overflow IllegalStateException detailing the tick that couldn't be emitted. In normal conditions, the Flux will never complete. Runs on the Schedulers.parallel() Scheduler.

So, you would do it like this:

@Bean
public Disposable someTaskScheduler() {
    String cron = "0 0 * * * *"; // execute a task every hour (can be moved to properties)
    Pair<Duration, Duration> durations = computeDurations(cron, LocalDateTime.now());

    return Flux.interval(durations.getFirst(), durations.getSecond()) // specify init delay and interval between executions
            .publishOn(Schedulers.boundedElastic())
            .onBackpressureDrop() // if the task below takes a long time, greater than the next tick, then just drop this tick
            .concatMap(__ -> Mono.defer(() -> {
                // process your tasks below
                log.info("Trying to process the task...");
                return Mono.just("completed");
            }), 0)
            .subscribe();

/**
 * Compute init delay and interval between executions from cron expression and current time
 *
 * @param cron cron expression to parse
 * @param currentTime current time
 * @return pair where left is an initial delay and right is an interval between executions
 */
public Pair<Duration, Duration> computeDurations(String cron, LocalDateTime currentTime) {
    CronExpression cronExpression = CronExpression.parse(cron);

    LocalDateTime initExecutionTime = cronExpression.next(currentTime);
    LocalDateTime nextExecutionTime = cronExpression.next(initExecutionTime);

    log.info("Init execution time: {}", initExecutionTime);
    log.info("Next execution time: {}", nextExecutionTime);

    Duration initDelay = Duration.between(currentTime, initExecutionTime);
    Duration interval = Duration.between(initExecutionTime, nextExecutionTime);

    return Pair.of(initDelay, interval);
}
Dazzle answered 22/12, 2023 at 9:5 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.