Java stop executor service once one of his assigned tasks fails for any reason
Asked Answered
R

2

12

I need some kind of service that will run a few tasks simultaneously and in an interval of 1 second for 1 minute.

If one of the tasks fails, I want to stop the service and every task that ran with it with some kind of indicator that something went wrong, otherwise if after one minute everything went well the service will stop with an indicator that all went well.

For example,i have 2 functions:

Runnable task1 = ()->{
      int num = Math.rand(1,100);
      if (num < 5){
          throw new Exception("something went wrong with this task,terminate");
      }
}

Runnable task2 = ()->{
      int num = Math.rand(1,100)
      return num < 50;
}



ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
task1schedule = scheduledExecutorService.scheduleAtFixedRate(task1, 1, 60, TimeUnit.SECONDS);
task2schedule = scheduledExecutorService.scheduleAtFixedRate(task2, 1, 60, TimeUnit.SECONDS);

if (!task1schedule || !task2schedule) scheduledExecutorService.shutdown();

Any ideas on how should I tackle this and make things as generic as possible?

Reena answered 10/11, 2019 at 17:47 Comment(2)
Few things apart from the actual question,Math.rand isn't a built-in API. An implementation of Runnable must have a void run definition. Type of task1/2schedule would be ScheduledFuture<?> in the provided context. Moving to the actual question, how does it go around to make use of awaitTermination? You could do that as scheduledExecutorService.awaitTermination(1,TimeUnit.MINUTES);. Alternatively, what about checking if any of the tasks got canceled before its normal completion: if (task1schedule.isCancelled() || task2schedule.isCancelled()) scheduledExecutorService.shutdown();?Incur
There is no sense in scheduling tasks to be repeated every minute, but then say, you want to stop the tasks “if after one minute everything went well”. Since you are stopping the executor in either case, scheduling a task that shuts down the executor after one minute is trivial. And the futures do already indicate whether something went wrong or not. You didn’t say, what other type of indicator you want.Herzegovina
P
8

The idea is that the tasks are pushing to a common object TaskCompleteEvent. If they push an error the scheduler is stopped and all the tasks will stop.

You can check the results of every task-iteration in the maps "errors" and "success".

public class SchedulerTest {

    @Test
    public void scheduler() throws InterruptedException {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
        TaskCompleteEvent taskCompleteEvent = new TaskCompleteEvent(scheduledExecutorService);
        Runnable task1 = () -> {
            int num = new Random().nextInt(100);
            if (num < 5) {
                taskCompleteEvent.message("task1-"+UUID.randomUUID().toString(), "Num "+num+" was obatined. Breaking all the executions.", true);
            }
        };
        Runnable task2 = () -> {
            int num = new Random().nextInt(100);
            taskCompleteEvent.message("task2-"+UUID.randomUUID().toString(), num < 50, false);
        };
        scheduledExecutorService.scheduleAtFixedRate(task1, 0, 1, TimeUnit.SECONDS);
        scheduledExecutorService.scheduleAtFixedRate(task2, 0, 1, TimeUnit.SECONDS);
        scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS);
        System.out.println("Success: "+taskCompleteEvent.getSuccess());
        System.out.println("Errors: "+taskCompleteEvent.getErrors());
        System.out.println("Went well?: "+taskCompleteEvent.getErrors().isEmpty());
    }

    public static class TaskCompleteEvent {

        private final ScheduledExecutorService scheduledExecutorService;
        private final Map<String, Object> errors = new LinkedHashMap<>();
        private final Map<String, Object> success = new LinkedHashMap<>();

        public TaskCompleteEvent(ScheduledExecutorService scheduledExecutorService) {
            this.scheduledExecutorService = scheduledExecutorService;
        }

        public synchronized void message(String id, Object response, boolean error) {
            if (error) {
                errors.put(id, response);
                scheduledExecutorService.shutdown();
            } else {
                success.put(id, response);
            }
        }

        public synchronized Map<String, Object> getErrors() {
            return errors;
        }

        public synchronized Map<String, Object> getSuccess() {
            return success;
        }

    }

}
Postmillennialism answered 13/11, 2019 at 14:58 Comment(0)
Y
2

You just need to add an additional task whose job it is to monitor all of the other running tasks -- and when any of the monitored tasks fail, they need to set a semaphore (flag) that the assassin can inspect.

    ScheduledExecutorService executor = (ScheduledExecutorService) Executors.newScheduledThreadPool(2);

    // INSTANTIATE THE REMOTE-FILE-MONITOR:
    RemoteFileMonitor monitor = new RemoteFileMonitor(remotesource, localtarget);

    // THIS TimerTask PERIODICALLY TRIGGERS THE RemoteFileMonitor: 
    TimerTask remote = new TimerTask() {

        // RUN FORREST... RUN !
        public void run() {

            try { 

                kae.trace("TimerTask::run() --> Calling RemoteFileMonitor.check()");
                monitor.check();

            } catch (Exception ex) {

                // NULL TRAP: ALLOWS US TO CONTINUE AND RETRY:

            }

        }

    };

    // THIS TimerTask PERIODICALLY TRIES TO KILL THE REMOTE-FILE-MONITOR:
    TimerTask assassin = new TimerTask() {

        // WHERE DO BAD FOLKS GO WHEN THEY DIE ? 
        private final LocalDateTime death = LocalDateTime.now().plus(ConfigurationOptions.getPollingCycleTime(), ChronoUnit.MINUTES);

        // RUN FORREST... RUN !
        public void run() {

            // IS THERE LIFE AFTER DEATH ???
            if (LocalDateTime.now().isAfter(death)) {

                // THEY GO TO A LAKE OF FIRE AND FRY:
                kae.error(ReturnCode.MONITOR_POLLING_CYCLE_EXCEEDED);                   

            }

        }

    };

    // SCHEDULE THE PERIODIC EXECUTION OF THE RemoteFileMonitor: (remote --> run() monitor --> check())
    executor.scheduleAtFixedRate(remote, delay, interval, TimeUnit.MINUTES);

    // SCHEDULE PERIODIC ASSASSINATION ATTEMPTS AGAINST THE RemoteFileMonitor: (assassin --> run() --> after death --> die())
    executor.scheduleAtFixedRate(assassin, delay, 60L, TimeUnit.SECONDS);

    // LOOP UNTIL THE MONITOR COMPLETES:
    do {

        try {

            // I THINK I NEED A NAP:
            Thread.sleep(interval * 10);                

        } catch (InterruptedException e) {

            // FAIL && THEN cleanexit();
            kae.error(ReturnCode.MONITORING_ERROR, "Monitoring of the XXXXXX-Ingestion site was interrupted");

        }

        // NOTE: THE MONITOR IS SET TO 'FINISHED' WHEN THE DONE-File IS DELIVERED AND RETRIEVED:
    } while (monitor.isNotFinished());

    // SHUTDOWN THE MONITOR TASK:
    executor.shutdown();
Yim answered 12/11, 2019 at 23:40 Comment(3)
The class TimerTask is entirely unrelated to ScheduledExecutorService; it just happens to implement Runnable. Further, it makes no sense to schedule a periodic task, just to check whether a particular time (ConfigurationOptions.getPollingCycleTime()) has been reached. You have a ScheduledExecutorService, so you can tell it to schedule the task right for the desired time.Herzegovina
The implementation in the example I used was to kill an executing task after a certain period of time if the task had not completed. The use-case was: If the remote server hasn't dropped a file within 2 hours -- kill the task. this is what the OP asked for.Yim
Did you read and understand my comment? It doesn’t matter what the code does, it uses a discouraged class for no reason, just replace TimerTask with Runnable and you’ve fixed the problem, without changing what the code does. Further, just use executor.schedule(assassin, ConfigurationOptions.getPollingCycleTime(), ChronoUnit.MINUTES); and it will run once at the desired time, hence, the if(LocalDateTime.now().isAfter(death)) check is obsolete. Again, the doesn’t change what the code does, besides doing it substantially simpler and more efficient.Herzegovina

© 2022 - 2024 — McMap. All rights reserved.