How to stop a Runnable scheduled for repeated execution after a certain number of executions
Asked Answered
S

7

69

Situation

I have a Runnable. I have a class that schedules this Runnable for execution using a ScheduledExecutorService with scheduleWithFixedDelay.

Goal

I want to alter this class to schedule the Runnable for fixed delay execution either indefinitely, or until it has been run a certain number of times, depending on some parameter that is passed in to the constructor.

If possible, I would like to use the same Runnable, as it is conceptually the same thing that should be "run".

Possible approaches

Approach #1

Have two Runnables, one that cancels the schedule after a number of executions (which it keeps a count of) and one that doesn't:

public class MyClass{
    private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

    public enum Mode{
        INDEFINITE, FIXED_NO_OF_TIMES
    }

    public MyClass(Mode mode){
        if(mode == Mode.INDEFINITE){
            scheduler.scheduleWithFixedDelay(new DoSomethingTask(), 0, 100, TimeUnit.MILLISECONDS);
        }else if(mode == Mode.FIXED_NO_OF_TIMES){
            scheduler.scheduleWithFixedDelay(new DoSomethingNTimesTask(), 0, 100, TimeUnit.MILLISECONDS);
        }
    }

    private class DoSomethingTask implements Runnable{
        @Override
        public void run(){
            doSomething();
        }
    }

    private class DoSomethingNTimesTask implements Runnable{
        private int count = 0;

        @Override
        public void run(){
            doSomething();
            count++;
            if(count > 42){
                // Cancel the scheduling.
                // Can you do this inside the run method, presumably using
                // the Future returned by the schedule method? Is it a good idea?
            }
        }
    }

    private void doSomething(){
        // do something
    }
}

I would rather just have one Runnable for the execution of the doSomething method. Tying the scheduling to the Runnable feels wrong. What do you think about this?

Approach #2

Have a single Runnable for the execution of the code that we want to run periodically. Have a separate scheduled runnable that checks how many times the first Runnable has run and cancels when it gets to a certain amount. This may not be accurate, as it would be asynchronous. It feels a bit cumbersome. What do you think about this?

Approach #3

Extend ScheduledExecutorService and add a method "scheduleWithFixedDelayNTimes". Perhaps such a class already exists? Currently, I'm using Executors.newSingleThreadScheduledExecutor(); to get my ScheduledExecutorService instance. I would presumably have to implement similar functionality to instantiate the extended ScheduledExecutorService. This could be tricky. What do you think about this?

No scheduler approach [Edit]

I could not use a scheduler. I could instead have something like:

for(int i = 0; i < numTimesToRun; i++){
    doSomething();
    Thread.sleep(delay);
}

And run that in some thread. What do you think of that? You could potentially still use the runnable and call the run method directly.


Any suggestions welcome. I'm looking for a debate to find the "best practice" way of achieving my goal.

Sitzmark answered 1/9, 2011 at 10:52 Comment(0)
Y
73

You can use the cancel() method on Future. From the javadocs of scheduleAtFixedRate

Otherwise, the task will only terminate via cancellation or termination of the executor

Here is some example code that wraps a Runnable in another that tracks the number of times the original was run, and cancels after running N times.

public void runNTimes(Runnable task, int maxRunCount, long period, TimeUnit unit, ScheduledExecutorService executor) {
    new FixedExecutionRunnable(task, maxRunCount).runNTimes(executor, period, unit);
}

class FixedExecutionRunnable implements Runnable {
    private final AtomicInteger runCount = new AtomicInteger();
    private final Runnable delegate;
    private volatile ScheduledFuture<?> self;
    private final int maxRunCount;

    public FixedExecutionRunnable(Runnable delegate, int maxRunCount) {
        this.delegate = delegate;
        this.maxRunCount = maxRunCount;
    }

    @Override
    public void run() {
        delegate.run();
        if(runCount.incrementAndGet() == maxRunCount) {
            boolean interrupted = false;
            try {
                while(self == null) {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        interrupted = true;
                    }
                }
                self.cancel(false);
            } finally {
                if(interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    public void runNTimes(ScheduledExecutorService executor, long period, TimeUnit unit) {
        self = executor.scheduleAtFixedRate(this, 0, period, unit);
    }
}
Yeti answered 4/9, 2011 at 14:24 Comment(13)
This is pretty much what @JB Nizet is suggesting. I know how to cancel a scheduled Runnable. What I want to know is the most appropriate way of cancelling it in this situation. Your solution ties the scheduling to the Runnable itself, which I'm not that convinced about.Sitzmark
added a convenience method to run a task n times to show that the scheduling is not tied to the runnable doing the workYeti
Oh awesome. I misunderstood what you were saying initially. I understand what you mean. So you are wrapping the Runnable that you want to run in another Runnable which is responsible for running N times. That sounds like a good design to me. It separates out the scheduling from the actual Runnable and allows re-use of the FixedExecutionRunnable runnable class. What do other people think of this approach?Sitzmark
I have been thinking about this solution... What do you think about removing the runNTimes method? This would leave the actual scheduling external to the Runnable. That would allow for scheduling at fixed rates / with fixed delays without having to add a bunch of methods. All the Runnable would need would be a max count and the delegate Runnable. What do you think to this?Sitzmark
which runNTimes method did you want to remove?Yeti
oh, sorry. The one inside the Runnable. So in the outer class, you would run something like executor.scheduleWithFixedDelay(new FixedExecutionRunnable(delegate, maxCount), 0, 100, TimeUnit.MILLISECONDS).Sitzmark
Ah, you can't do that because then the Runnable wouldn't have access to the Future... And you couldn't pass the future in to the constructor because it would not have been scheduled at the point of instantiation.Sitzmark
You could do as others have suggested and throw an exception in the run method, but I would rather avoid this is possible. Is there any way to get at the Future from within the Runnable, without doing the scheduling from within the Runnable?Sitzmark
You need access to the Future somewhere. I would just expose the first runNTimes as a static helper method somewhere, and make FixedExecutionRunnable private within the class that contains runNTimes.Yeti
Does runCount need to be an AtomicInteger in this case?Bobbee
As I can see this code can throw NPE in self.cancel(false);. Is there any way to ensure that run() will run only after assignement self ?Finedraw
@Finedraw should be fixed now, though it isn't fool proofYeti
I run this and the program does not terminate after say 1 run (fixedRate = 1) , it just keeps on running. Any idea why so ? I am calling runNTimes as myClass.runNTimes(runnableTask, 1, delayedStartTime, TimeUnit.MILLISECONDS, executor); where runnableTask is a Runnable, 1 is no of times to run, delayedStartTime is long (say 1 minute) and executor is ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); rest all code is same.Serotonin
F
10

Quoted from the API description (ScheduledExecutorService.scheduleWithFixedDelay):

Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given delay between the termination of one execution and the commencement of the next. If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor.

So, the easiest thing would be to "just throw an exception" (even though this is considered bad practice):

static class MyTask implements Runnable {

    private int runs = 0;

    @Override
    public void run() {
        System.out.println(runs);
        if (++runs >= 20)
            throw new RuntimeException();
    }
}

public static void main(String[] args) {
    ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor();
    s.scheduleWithFixedDelay(new MyTask(), 0, 100, TimeUnit.MILLISECONDS);
}
Fiduciary answered 1/9, 2011 at 11:21 Comment(5)
That would mean throwing an "exception" in non-exceptional circumstances... I don't really like the semantics of that...Sitzmark
It's similar to throwing an InterruptedException, seems like the most obvious and clean soloution to me.Letha
I think its a terrible idea to throw an Exception to simply cancel execution under non exceptional circumstances, especially since there are proper ways (like in sbridges answer) to stop the execution.Whomever
In addition to the above comments I would add ++ is not an atomic so you are not ensuring the correct number of iterations. Use AtomicInteger.getAndIncrement instead.Velites
@Velites Sure, in general this could be a concern. In this particular case however, I'm using a single threaded execution service so it shouldn't matterFiduciary
W
6

So far sbridges solution seems to be the cleanest one, except for what you mentioned, that it leaves the responsibility of handling the number of executions to the Runnable itself. It should not be concerned with this, instead the repetitions should be a parameter of the class handling the scheduling. To achieve this, I would suggest the following design, that introduces a new executor class for Runnables. The class provides two public methods for scheduling tasks, which are standard Runnables, with finite or infinite repetition. The same Runnable can be passed for finite and infinite scheduling, if desired (which is not possible with all proposed solutions that extend the Runnable class to provide finite repetitions). The handling of canceling finite repetitions is completely encapsulated in the scheduler class:

class MaxNScheduler
{

  public enum ScheduleType 
  {
     FixedRate, FixedDelay
  }

  private ScheduledExecutorService executorService =
     Executors.newSingleThreadScheduledExecutor();

  public ScheduledFuture<?> scheduleInfinitely(Runnable task, ScheduleType type, 
    long initialDelay, long period, TimeUnit unit)
  {
    return scheduleNTimes(task, -1, type, initialDelay, period, unit);
  }

  /** schedule with count repetitions */
  public ScheduledFuture<?> scheduleNTimes(Runnable task, int repetitions, 
    ScheduleType type, long initialDelay, long period, TimeUnit unit) 
  {
    RunnableWrapper wrapper = new RunnableWrapper(task, repetitions);
    ScheduledFuture<?> future;
    if(type == ScheduleType.FixedDelay)
      future = executorService.scheduleWithFixedDelay(wrapper, 
         initialDelay, period, TimeUnit.MILLISECONDS);
    else
      future = executorService.scheduleAtFixedRate(wrapper, 
         initialDelay, period, TimeUnit.MILLISECONDS);
    synchronized(wrapper)
    {
       wrapper.self = future;
       wrapper.notify(); // notify wrapper that it nows about it's future (pun intended)
    }
    return future;
  }

  private static class RunnableWrapper implements Runnable 
  {
    private final Runnable realRunnable;
    private int repetitions = -1;
    ScheduledFuture<?> self = null;

    RunnableWrapper(Runnable realRunnable, int repetitions) 
    {
      this.realRunnable = realRunnable;
      this.repetitions = repetitions;
    }

    private boolean isInfinite() { return repetitions < 0; }
    private boolean isFinished() { return repetitions == 0; }

    @Override
    public void run()
    {
      if(!isFinished()) // guard for calls to run when it should be cancelled already
      {
        realRunnable.run();

        if(!isInfinite())
        {
          repetitions--;
          if(isFinished())
          {
            synchronized(this) // need to wait until self is actually set
            {
              if(self == null)
              {
                 try { wait(); } catch(Exception e) { /* should not happen... */ }
              }
              self.cancel(false); // cancel gracefully (not throwing InterruptedException)
            }
          }
        }
      }
    }
  }

}

To be fair, the logic of managing the repetitions is still with a Runnable, but it'a a Runnable completely internal to the MaxNScheduler, whereas the Runnable task passed for scheduling has to not concern itself with the nature of the scheduling. Also this concern could be easily moved out into the scheduler if desired, by providing some callback every time RunnableWrapper.run was executed. This would complicate the code slightly and would introduce the need of keeping some map of RunnableWrappers and the corresponding repetitions, which is why I opted for keeping the counters in the RunnableWrapper class.

I also added some synchronization on the wrapper when setting the self. This is needed as theoretically, when the executions finish, self might not have been assigned yet (a quite theoretical scenario, but for only 1 repetition possible).

The cancelling is handled gracefully, without throwing an InterruptedException and in case before the cancel is executed, another round is scheduled, the RunnableWrapper will not call the underlying Runnable.

Whomever answered 11/9, 2011 at 3:11 Comment(2)
The wrapper is a nice apporach. It seems quite clean. You would have to add two more methods if you wanted to allow for sheduling with fixed interval rather than fixed delay, but that's not the end of the world.Sitzmark
@Spycho: Sure, the interface should be extended anyway to allow to specify the start delay, interval, etc. I'll add that to the answer.Whomever
K
2

Here is my suggestion (I believe it handles all cases mentioned in the question):

public class RepeatedScheduled implements Runnable {

    private int repeatCounter = -1;
    private boolean infinite;

    private ScheduledExecutorService ses;
    private long initialDelay;
    private long delay;
    private TimeUnit unit;

    private final Runnable command;
    private Future<?> control;

    public RepeatedScheduled(ScheduledExecutorService ses, Runnable command,
        long initialDelay, long delay, TimeUnit unit) {

        this.ses = ses;
        this.initialDelay = initialDelay;
        this.delay = delay;
        this.unit = unit;

        this.command = command;
        this.infinite = true;

    }

    public RepeatedScheduled(ScheduledExecutorService ses, Runnable command,
        long initialDelay, long delay, TimeUnit unit, int maxExecutions) {

        this(ses, command, initialDelay, delay, unit);
        this.repeatCounter = maxExecutions;
        this.infinite = false;

    }

    public Future<?> submit() {

        // We submit this, not the received command
        this.control = this.ses.scheduleWithFixedDelay(this,
            this.initialDelay, this.delay, this.unit);

        return this.control;

    }

    @Override
    public synchronized void run() {

        if ( !this.infinite ) {
            if ( this.repeatCounter > 0 ) {
                this.command.run();
                this.repeatCounter--;
            } else {
                this.control.cancel(false);
            }
        } else {
            this.command.run();
        }

    }

}

In addition, it allows an external party to stop everything from the Future returned by the submit() method.

Usage:

Runnable MyRunnable = ...;
// Repeat 20 times
RepeatedScheduled rs = new RepeatedScheduled(
    MySes, MyRunnable, 33, 44, TimeUnit.SECONDS, 20);
Future<?> MyControl = rs.submit();
...
Kwangchowan answered 4/9, 2011 at 17:2 Comment(2)
This appears to be pretty much the same as @sbridges's solution. It has a mechanism for getting at the Future, but that would be a triviality in the other solution. Are there any notable differences that I may have missed?Sitzmark
@Sitzmark It handles the case where there is no limit to the number of executions and covers the case when the provided runnable is not thread safe.Laryngeal
A
2

For use cases like polling until a certain timeout, we can approach with a simpler solution using Future.get().

/* Define task */
public class Poll implements Runnable {
    @Override
    public void run() {
        // Polling logic
    }
}

/* Create executor service */
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);

/* Schedule task - poll every 500ms */
ScheduledFuture<?> future = executorService.scheduleAtFixedRate(new Poll(), 0, 500, TimeUnit.MILLISECONDS);

/* Wait till 60 sec timeout */
try {
    future.get(60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    scheduledFuture.cancel(false);
    // Take action on timeout
}
Allowance answered 11/7, 2019 at 1:46 Comment(0)
B
1

Your first approach seems OK. You could combine both types of runnables by passing the mode object to its constructor (or pass -1 as the max number of times it must run), and use this mode to determine if the runnable must be canceled or not :

private class DoSomethingNTimesTask implements Runnable{
    private int count = 0;
    private final int limit;

    /**
     * Constructor for no limit
     */
    private DoSomethingNTimesTask() {
        this(-1);
    }

    /**
     * Constructor allowing to set a limit
     * @param limit the limit (negative number for no limit)
     */
    private DoSomethingNTimesTask(int limit) {
        this.limit = limit;
    }

    @Override
    public void run(){
        doSomething();
        count++;
        if(limit >= 0 && count > limit){
            // Cancel the scheduling
        }
    }
}

You'll have to pass the scheduled future to your task in order for it to cancel itself, or you might throw an exception.

Bargainbasement answered 1/9, 2011 at 11:24 Comment(4)
So you think it's OK to tie the scheduling to the Runnable itself? To me it seems like the Runnable shouldn't be aware of the scheduling. Would you mind expanding on that?Sitzmark
+1 for the combining idea. That's much better than my initial approach.Sitzmark
throwing an exception is probably easier and cleaner. Yoyu may define a specific type of exception for this: LimitReachedException or something like that.Bargainbasement
I would be inclined to cancel rather than throw an exception because I don't think that this is an exceptional circumstance, but your suggestion would work either way.Sitzmark
F
0

I've been looking for exact same functionality and chose org.springframework.scheduling.Trigger.

Below is full-test working example (sorry if too much flood in code) applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:task="http://www.springframework.org/schema/task"
 xmlns:util="http://www.springframework.org/schema/util"
 xmlns:context="http://www.springframework.org/schema/context"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context/ http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/util/ http://www.springframework.org/schema/util/spring-util.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">

    <bean id="blockingTasksScheduler" class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
        <property name="poolSize" value="10" />
    </bean>

    <task:scheduler id="deftaskScheduler" pool-size="10" />

</beans>

JAVA

package com.alz.springTests.schedulerTest;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

public class ScheduledTest {

    private static ApplicationContext applicationContext;
    private static TaskScheduler taskScheduler;

    private static final class SelfCancelableTask implements Runnable, Trigger {
        Date creationTime = new Date();
        AtomicInteger counter = new AtomicInteger(0);
        private volatile boolean shouldStop = false;
        private int repeatInterval = 3; //seconds

        @Override
        public void run() {
            log("task: run started");

            // simulate "doing job" started
            int sleepTimeMs = ThreadLocalRandom.current().nextInt(500, 2000+1);
            log("will sleep " + sleepTimeMs + " ms");
            try {
                Thread.sleep(sleepTimeMs);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // "doing job" finished

            int i = counter.incrementAndGet();
            if (i > 5) { //cancel myself
                logErr("Attempts exceeded, will mark as shouldStop");
                shouldStop = true;

            } else {
                log("task: executing cycle #"+i);
            }
        }

        @Override
        public Date nextExecutionTime(TriggerContext triggerContext) {
            log("nextExecutionTime: triggerContext.lastActualExecutionTime() " + triggerContext.lastActualExecutionTime());
            log("nextExecutionTime: triggerContext.lastCompletionTime() " + triggerContext.lastCompletionTime());
            log("nextExecutionTime: triggerContext.lastScheduledExecutionTime() " + triggerContext.lastScheduledExecutionTime());

            if (shouldStop) 
                return null;

            if (triggerContext.lastCompletionTime() == null) {
                LocalDateTime ldt = creationTime.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime().plus(repeatInterval, ChronoUnit.SECONDS);
                return Date.from(ldt.atZone(ZoneId.systemDefault()).toInstant());
            } else {
                LocalDateTime ldt = triggerContext.lastCompletionTime().toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime().plus(repeatInterval, ChronoUnit.SECONDS);
                return Date.from(ldt.atZone(ZoneId.systemDefault()).toInstant());               
            }

        }

    }

    private static void log(String log) {
        System.out.printf("%s [%s] %s\r\n", LocalDateTime.now(), Thread.currentThread(), log);
    }

    private static void logErr(String log) {
        System.err.printf("%s [%s] %s\r\n", LocalDateTime.now(), Thread.currentThread(), log);
    }

    public static void main(String[] args) {

        log("main: Stated...");

        applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");

        taskScheduler = (TaskScheduler) applicationContext.getBean("blockingTasksScheduler");

        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = ((ThreadPoolTaskScheduler)taskScheduler).getScheduledThreadPoolExecutor();

        SelfCancelableTask selfCancelableTask = new SelfCancelableTask();
        taskScheduler.schedule(selfCancelableTask, selfCancelableTask);


        int waitAttempts = 0;
        while (waitAttempts < 30) {
            log("scheduledPool pending tasks: " + scheduledThreadPoolExecutor.getQueue().size());

            try {
                Thread.sleep(1*1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            waitAttempts++;

        }

        log("main: Done!");


    }

}
Forego answered 15/3, 2019 at 13:35 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.