ExecutorService - Execute Each Task with a Specific Time Limit
Asked Answered
C

3

6

I'm creating an ExecutorService to execute some tasks which under normal conditions is expected to take one minute to complete but under no circumstances should be allowed to run for more than two minutes from when the task started.

My code is as follows:

ExecutorService executorService = Executors.newFixedThreadPool(10);
ArrayList<Future<?>> futuresList = new ArrayList<Future<?>>();



for (String singleTask: taskList) { 
                futuresList.add(executorService.submit( new Runnable(){      
                       @Override
                       public void run(){
                        try {
                            performTask(p1, singleTask, p3);
                        } catch (IOException | InterruptedException | ExecutionException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                }
              }
         }));       

    }



for(Future<?> future : futures) {
    future.get(120, TimeUnit.SECONDS); 
}

This will block until specified timeout and then move on. My problem(s) are as follows:

1) If task1 blocks for two minutes and task2 also blocks for two minutes - then task2 will have "blocked" for a total of 4 minutes (since future.get(120, TimeUnit.SECONDS); is not called on task2 until task1 finishes blocking) - even though both tasks were submitted and started executing at the same time

2) If I submit more than 10 tasks, task 11+ may never block for the desired amount of time, if previous tasks have not complete by the time future.get(120, TimeUnit.SECONDS); is called on the 11th task

My goal is to have each individual task execute for a max of two minutes, regardless of the number of tasks in list and regardless of how many tasks come before it or after it.

Thanks

Chloramine answered 7/11, 2018 at 19:23 Comment(5)
You can use SingleThreadedExecutorHesperidin
@Ivan, he can't - he still needs thread pool for executing the tasks.Washbasin
What are you going to do with the task that is running for more than timeout period? How are you going to cancel the task or time it out? Looks like you need to have some king of custom timeout hook in performTask.Bartizan
@tsolakp, we have this same sort of thing running in our system - it needs a custom worker thread implementation, and a separate watchdog threads that wake up after a specified timeout and try to kill the thread if it hanged. It's a pretty long thing to add into an answer, I feel. Do you think my description is enough to leave at that?Washbasin
@M. Prokhorov. Agree. The OP most likely needs to have a custom timeout or thread termination implementation.Bartizan
F
0

You can take the time with System.currentTimeMillis(); then you add the maximum time, eg 120_000 milliseconds. When you wait you subtract the current time. I.e. you only wait until the maximum time is reached.

Fluctuation answered 7/11, 2018 at 19:28 Comment(1)
A short explanation would be helpful to understand how this answer addresses the two questions in the OP. Thanks.Chloramine
V
0

ExecutorService#invokeAll could be the key here.

The question is a bit hard to understand (maybe even because you tried to describe it precisely? ;-)). So I created an example, trying to wrap my head around it. Even if this is not what you intended, maybe you can explain in how far this differs from your goal, so that the question can be clarified or others may write a better answer.

The example creates the tasks, as Callable objects, that are put into a list. Such a list can be passed to ExecutorService#invokeAll. (In your case, you could probably create these instances from your Runnable tasks with Executors#callable). There are 5 tasks created. Each task takes, by default, 2000ms to execute. Task "C" is the odd one out, and takes 8000ms. The maximum execution time should be 5000ms.

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceLimitTaskTime
{
    private static Map<String, Long> taskSubmitMs = 
        new ConcurrentHashMap<String, Long>();
    private static Map<String, Long> taskStartMs = 
        new ConcurrentHashMap<String, Long>();
    private static Map<String, Long> taskFinishedMs = 
        new ConcurrentHashMap<String, Long>();

    public static void main(String[] args) throws Exception
    {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        List<String> tasks = Arrays.asList("A", "B", "C", "D", "E");

        List<Callable<String>> callables = new ArrayList<Callable<String>>();
        for (String task : tasks)
        {
            taskSubmitMs.put(task, System.currentTimeMillis());

            callables.add(new Callable<String>()
            {
                @Override
                public String call()
                {
                    taskStartMs.put(task, System.currentTimeMillis());

                    long durationMs = 2000;
                    if (task.equals("C"))
                    {
                        durationMs = 8000;
                    }

                    performTask(task, durationMs);
                    if (!Thread.currentThread().isInterrupted())
                    {
                        taskFinishedMs.put(task, System.currentTimeMillis());
                    }
                    return task;
                }
            });
        }

        List<Future<String>> futures = 
            executorService.invokeAll(callables, 5000, TimeUnit.MILLISECONDS);

        for (Future<String> future : futures)
        {
            try
            {
                future.get();
            }
            catch (CancellationException e) 
            {
                System.out.println("One task was cancelled");
            }
        }

        for (String task : tasks)
        {
            Long submitMs = taskSubmitMs.get(task);
            Long startMs = taskStartMs.get(task);
            Long finishedMs = taskFinishedMs.get(task);

            if (finishedMs != null)
            {
                long waitMs = startMs - submitMs;
                long runMs = finishedMs - startMs;
                long totalMs = finishedMs - submitMs;
                System.out.printf(
                    "Task %-3s waited %5d ms and ran %5d ms, total %5d ms\n", 
                    task, waitMs, runMs, totalMs);
            }
            else
            {
                System.out.printf(
                    "Task %-3s was cancelled\n", task);

            }
        }

    }

    private static void performTask(String task, long durationMs)
    {
        System.out.println("Executing " + task);
        try
        {
            Thread.sleep(durationMs);
        }
        catch (InterruptedException e)
        {
            Thread.currentThread().interrupt();
        }
        System.out.println("Executing " + task + " DONE");
    }

}

The summary that is printed at the end shows this result:

Task A   waited    16 ms and ran  2002 ms, total  2018 ms
Task B   waited     3 ms and ran  2002 ms, total  2005 ms
Task C   was cancelled
Task D   waited  2005 ms and ran  2000 ms, total  4005 ms
Task E   waited  2005 ms and ran  2000 ms, total  4005 ms

This shows that

  • The tasks that started immediately ran for 2000ms
  • The tasks that had to wait for others also ran for 2000ms (but 4000ms in total)
  • The task that took too long was cancelled after 5000ms
Vituperate answered 7/11, 2018 at 23:59 Comment(0)
C
0

OK, I'm not sure if my question was fully understood but I will attempt to answer my own question with the solution I came up with (this might help clarify the question to others). I think @Peter Lawrey eluded to this answer but the answer was too short to know for sure.

        int timeLimitOfIndividualTaskInSeconds = 120;
        int fixedThreadPoolCount = 10;

        ExecutorService executorService = Executors.newFixedThreadPool(fixedThreadPoolCount);
        ArrayList<Future<?>> futuresList = new ArrayList<Future<?>>();

        for (String singleTask: taskList) {

            futuresList.add(executorService.submit( new Runnable(){      
                @Override
                public void run(){
                    try {
                        executeTask(singleTask);
                    } catch (IOException | InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                }
            }));        

        }

        long beforeTimeInMilli = System.currentTimeMillis();
        long beforeTimeInSeconds = TimeUnit.MILLISECONDS.toSeconds(beforeTimeInMilli);
        int counter = 0;

        long timeoutInSeconds = timeLimitOfIndividualTaskInSeconds;

        for(Future<?> future : futuresList) {
            if (counter % fixedThreadPoolCount == 0) {

                // resets time limit to initial limit since next batch of tasks are beginning to execute
                timeoutInSeconds = timeLimitOfIndividualTaskInSeconds;
            }

            try {
                future.get(timeoutInSeconds, TimeUnit.SECONDS);
            } catch (Exception e){
                e.printStackTrace();
                future.cancel(true); //stops the underlying task
            }

            counter++;

            long afterTimeInMilli = System.currentTimeMillis();
            long afterTimeInSeconds = TimeUnit.MILLISECONDS.toSeconds(afterTimeInMilli);

            long taskDurationInSeconds = afterTimeInSeconds - beforeTimeInSeconds;
            timeoutInSeconds = timeoutInSeconds - taskDurationInSeconds;

        }   

This guarantees two things:

1) All tasks that have been submitted and started executing at the same time (i.e, "same batch") will run for a max of 120 seconds (but if any task completes before 120 seconds it will not continue block)

2) Prior tasks in the same batch will not cause subsequent tasks in that batch to execute for longer than 120 second time limit (since we deduct execution time of previous tasks from timeout value in subsequent tasks)

I find this simple and elegant solution - but of course I'm happy to hear from anyone who is able to enhance or comment on this solution.

Chloramine answered 9/11, 2018 at 2:53 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.