How do I implement task prioritization using an ExecutorService in Java 5?
Asked Answered
G

6

47

I am implementing a thread pooling mechanism in which I'd like to execute tasks of varying priorities. I'd like to have a nice mechanism whereby I can submit a high priority task to the service and have it be scheduled before other tasks. The priority of the task is an intrinsic property of the task itself (whether I express that task as a Callable or a Runnable is not important to me).

Now, superficially it looks like I could use a PriorityBlockingQueue as the task queue in my ThreadPoolExecutor, but that queue contains Runnable objects, which may or may not be the Runnable tasks I've submitted to it. Moreover, if I've submitted Callable tasks, it's not clear how this would ever map.

Is there a way to do this? I'd really rather not roll my own for this, since I'm far more likely to get it wrong that way.

(An aside; yes, I'm aware of the possibility of starvation for lower-priority jobs in something like this. Extra points (?!) for solutions that have a reasonable guarantee of fairness)

Goatish answered 30/4, 2009 at 14:33 Comment(4)
Interesting question. This seems like a bit of an oversight in the API, in my opinion.Oomph
If I had to guess why it's not part of the API, I'd say that it's probably because the starvation issue is a tricky one. They'd need to provide a new set of primitives for fairness and escalation; things like must-execute-by and may-be-indefinitely-deferred (note that I'm pulling these names out of my ass). I might wish they'd done it, but I don't blame them :)Goatish
Yeah, that makes sense. Seems like it would be a nice thing to have, though, but when you think you need to essentially write a CPU scheduling algorithm in Java you're probably doing something wrong.Oomph
JDK 6 corrects this omission by providing new newTaskFor methods in AbstractExecutorService that let you control how the submitted tasks are wrapped, and thus let you return instances that are Comparable and can be easily ordered by a custom PriorityQueue.Dissogeny
O
8

At first blush it would seem you could define an interface for your tasks that extends Runnable or Callable<T> and Comparable. Then wrap a ThreadPoolExecutor with a PriorityBlockingQueue as the queue, and only accept tasks that implement your interface.

Taking your comment into account, it looks like one option is to extend ThreadPoolExecutor, and override the submit() methods. Refer to AbstractExecutorService to see what the default ones look like; all they do is wrap the Runnable or Callable in a FutureTask and execute() it. I'd probably do this by writing a wrapper class that implements ExecutorService and delegates to an anonymous inner ThreadPoolExecutor. Wrap them in something that has your priority, so that your Comparator can get at it.

Oomph answered 30/4, 2009 at 15:11 Comment(7)
That was my take, too, but here's the problem; the Runnable instances that are passed in to the priority queue are not the tasks that I submit directly, they're wrapped in a java.util.concurrent.FutureTask<V> which of course is not sorted the same way. If I use execute -- which does not, for example, accept Callable -- then it throws my own objects in.Goatish
Hmm, that complicates things. I figured there was something I was missing.Oomph
I'll say. I'm still beating away at it, but... Well, suffice to say it's a pain :)Goatish
Adam, it appears that you are sitting right behind me, because that's what I just did. Now, it's worth noting that this does NOT solve starvation problems. I'm not really sure how to go about that, but it's something I'll have to consider.Goatish
Well, you know what they say about great minds, eh? The starvation issue is complicated. The only thing I can think of, and it could get really messy, is to use some sort of aging algorithm whereby from time to time you increase the priority of all tasks, and somehow update the queue. Unfortunately, that is <a href="#715296 said than done.</a>Oomph
Oh for crying out loud... https://mcmap.net/q/27210/-priorityqueue-heap-updateOomph
The logic being that if you increase the priority of ALL tasks, then heapify the queue, you aren't changing priority of existing tasks in relation to each other, but any tasks already in the queue get higher priority than new tasks being added to the queue. I can't see this being trivial to implement in a lock-free way, though...Oomph
E
16

I have solved this problem in a reasonable fashion, and I'll describe it below for future reference to myself and anyone else who runs into this problem with the Java Concurrent libraries.

Using a PriorityBlockingQueue as the means for holding onto tasks for later execution is indeed a movement in the correct direction. The problem is that the PriorityBlockingQueue must be generically instantiated to contain Runnable instances, and it is impossible to call compareTo (or similiar) on a Runnable interface.

Onto solving the problem. When creating the Executor, it must be given a PriorityBlockingQueue. The queue should further be given a custom Comparator to do proper in place sorting:

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());

Now, a peek at CustomTaskComparator:

public class CustomTaskComparator implements Comparator<MyType> {

    @Override
    public int compare(MyType first, MyType second) {
         return comparison;
    }

}

Everything looking pretty straight forward up to this point. It gets a bit sticky here. Our next problem is to deal with the creation of FutureTasks from the Executor. In the Executor, we must override newTaskFor as so:

@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
    //Override the default FutureTask creation and retrofit it with
    //a custom task. This is done so that prioritization can be accomplished.
    return new CustomFutureTask(c);
}

Where c is the Callable task that we're trying to execute. Now, let's have a peek at CustomFutureTask:

public class CustomFutureTask extends FutureTask {

    private CustomTask task;

    public CustomFutureTask(Callable callable) {
        super(callable);
        this.task = (CustomTask) callable;
    }

    public CustomTask getTask() {
        return task;
    }

}

Notice the getTask method. We're gonna use that later to grab the original task out of this CustomFutureTask that we've created.

And finally, let's modify the original task that we were trying to execute:

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {

    private final MyType myType;

    public CustomTask(MyType myType) {
        this.myType = myType;
    }

    @Override
    public MyType call() {
        //Do some things, return something for FutureTask implementation of `call`.
        return myType;
    }

    @Override
    public int compareTo(MyType task2) {
        return new CustomTaskComparator().compare(this.myType, task2.myType);
    }

}

You can see that we implement Comparable in the task to delegate to the actual Comparator for MyType.

And there you have it, customized prioritization for an Executor using the Java libraries! It takes some bit of bending, but it's the cleanest that I've been able to come up with. I hope this is helpful to someone!

Euchology answered 25/2, 2012 at 18:25 Comment(3)
There are some inherent limitations to this mechanism. For instance, the first runnable/callable passed to the executor doesn't go in the queue. Thus the priority mechanism will only apply when tasks are queued, and this happens when the number of current runners is exceeds the number of max thread in the pool size (here 1).Underhand
In CustomTask, you should not instanciate an object for each comparison, this is going to slow things down quite a lot.Underhand
where is the getTask used?Rotman
O
8

At first blush it would seem you could define an interface for your tasks that extends Runnable or Callable<T> and Comparable. Then wrap a ThreadPoolExecutor with a PriorityBlockingQueue as the queue, and only accept tasks that implement your interface.

Taking your comment into account, it looks like one option is to extend ThreadPoolExecutor, and override the submit() methods. Refer to AbstractExecutorService to see what the default ones look like; all they do is wrap the Runnable or Callable in a FutureTask and execute() it. I'd probably do this by writing a wrapper class that implements ExecutorService and delegates to an anonymous inner ThreadPoolExecutor. Wrap them in something that has your priority, so that your Comparator can get at it.

Oomph answered 30/4, 2009 at 15:11 Comment(7)
That was my take, too, but here's the problem; the Runnable instances that are passed in to the priority queue are not the tasks that I submit directly, they're wrapped in a java.util.concurrent.FutureTask<V> which of course is not sorted the same way. If I use execute -- which does not, for example, accept Callable -- then it throws my own objects in.Goatish
Hmm, that complicates things. I figured there was something I was missing.Oomph
I'll say. I'm still beating away at it, but... Well, suffice to say it's a pain :)Goatish
Adam, it appears that you are sitting right behind me, because that's what I just did. Now, it's worth noting that this does NOT solve starvation problems. I'm not really sure how to go about that, but it's something I'll have to consider.Goatish
Well, you know what they say about great minds, eh? The starvation issue is complicated. The only thing I can think of, and it could get really messy, is to use some sort of aging algorithm whereby from time to time you increase the priority of all tasks, and somehow update the queue. Unfortunately, that is <a href="#715296 said than done.</a>Oomph
Oh for crying out loud... https://mcmap.net/q/27210/-priorityqueue-heap-updateOomph
The logic being that if you increase the priority of ALL tasks, then heapify the queue, you aren't changing priority of existing tasks in relation to each other, but any tasks already in the queue get higher priority than new tasks being added to the queue. I can't see this being trivial to implement in a lock-free way, though...Oomph
I
4

You can use these helper classes:

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get(timeout, unit);
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

AND

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

AND this helper method:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

AND then use it like this:

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}
Interlingua answered 16/5, 2013 at 1:50 Comment(3)
@Underhand the change you have made does not compile.Sikorski
@assyslias, which JDK version do you use ?Underhand
The best article I could find is here : binkley.blogspot.fr/2009/04/jumping-work-queue-in-executor.html. It is really tricky to get something that works on JDK 1.7-1.6 and 1.5Underhand
H
4

I will try to explain this problem with a fully functional code. But before diving into the code I would like to explain about PriorityBlockingQueue

PriorityBlockingQueue : PriorityBlockingQueue is an implementation of BlockingQueue. It accepts the tasks along with their priority and submits the task with the highest priority for execution first. If any two tasks have same priority, then we need to provide some custom logic to decide which task goes first.

Now lets get into the code straightaway.

Driver class : This class creates an executor which accepts tasks and later submits them for execution. Here we create two tasks one with LOW priority and the other with HIGH priority. Here we tell the executor to run a MAX of 1 threads and use the PriorityBlockingQueue.

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);


    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.MEDIUM,"Medium");
    executor.execute(new MyFutureTask(task));

}

MyTask class : MyTask implements Runnable and accepts priority as an argument in the constructor. When this task runs, it prints a message and then puts the thread to sleep for 1 second.

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

MyFutureTask class : Since we are using PriorityBlocingQueue for holding our tasks, our tasks must be wrapped inside FutureTask and our implementation of FutureTask must implement Comparable interface. The Comparable interface compares the priority of 2 different tasks and submits the task with the highest priority for execution.

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

Priority class : Self explanatory Priority class.

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

Now when we run this example, we get the following output

The following Runnable is getting executed High
The following Runnable is getting executed Medium
The following Runnable is getting executed Low

Even though we submitted the LOW priority first, but HIGH priority task later, but since we are using a PriorityBlockingQueue, any task with a higher priority will execute first.

Haroldharolda answered 24/1, 2016 at 10:7 Comment(1)
Create High1, High2, Low1,Low2,Low3 tasks, and their execution is shuffled within priority level. Solution needed that preserve submit order for same priority tasksEllette
S
1

My solution preserves submition order of tasks for same priorities. It's an improvement of this answer

Task execution order is based on:

  1. Priority
  2. Submit order (within same priority)

Tester class:

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1);

        //Priority=0
        executorService.submit(newCallable("A1", 200));     //Defaults to priority=0 
        executorService.execute(newRunnable("A2", 200));    //Defaults to priority=0
        executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0));

        //Priority=1
        executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1));
        executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1));
        executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1));

        executorService.shutdown();

    }

    private static Runnable newRunnable(String name, int delay) {
        return new Runnable() {
            @Override
            public void run() {
                System.out.println(name);
                sleep(delay);
            }
        };
    }

    private static Callable<Integer> newCallable(String name, int delay) {
        return new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println(name);
                sleep(delay);
                return 10;
            }
        };
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

}

Result:

A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8

First task is A1 because there were no higher priority in the queue when it was inserted. B tasks are 1 priority so executed earlier, A tasks are 0 priority so executed later, but execution order is follows submition order: B1, B2, B3, ... A2, A3, A4 ...

The solution:

public class PriorityExecutors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS);
    }

    private static class PriorityExecutor extends ThreadPoolExecutor {
        private static final int DEFAULT_PRIORITY = 0;
        private static AtomicLong instanceCounter = new AtomicLong();

        @SuppressWarnings({"unchecked"})
        public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                long keepAliveTime, TimeUnit unit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
                    ComparableTask.comparatorByPriorityAndSequentialOrder()));
        }

        @Override
        public void execute(Runnable command) {
            // If this is ugly then delegator pattern needed
            if (command instanceof ComparableTask) //Already wrapped
                super.execute(command);
            else {
                super.execute(newComparableRunnableFor(command));
            }
        }

        private Runnable newComparableRunnableFor(Runnable runnable) {
            return new ComparableRunnable(ensurePriorityRunnable(runnable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new ComparableFutureTask<>(ensurePriorityCallable(callable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
        }

        private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) {
            return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable
                    : PriorityCallable.of(callable, DEFAULT_PRIORITY);
        }

        private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
            return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
                    : PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
        }

        private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
            private Long sequentialOrder = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;

            public ComparableFutureTask(PriorityCallable<T> priorityCallable) {
                super(priorityCallable);
                this.hasPriority = priorityCallable;
            }

            public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
                super(priorityRunnable, result);
                this.hasPriority = priorityRunnable;
            }

            @Override
            public long getInstanceCount() {
                return sequentialOrder;
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }
        }

        private static class ComparableRunnable implements Runnable, ComparableTask {
            private Long instanceCount = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;
            private Runnable runnable;

            public ComparableRunnable(PriorityRunnable priorityRunnable) {
                this.runnable = priorityRunnable;
                this.hasPriority = priorityRunnable;
            }

            @Override
            public void run() {
                runnable.run();
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }

            @Override
            public long getInstanceCount() {
                return instanceCount;
            }
        }

        private interface ComparableTask extends Runnable {
            int getPriority();

            long getInstanceCount();

            public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
                return (o1, o2) -> {
                    int priorityResult = o2.getPriority() - o1.getPriority();
                    return priorityResult != 0 ? priorityResult
                            : (int) (o1.getInstanceCount() - o2.getInstanceCount());
                };
            }

        }

    }

    private static interface HasPriority {
        int getPriority();
    }

    public interface PriorityCallable<V> extends Callable<V>, HasPriority {

        public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) {
            return new PriorityCallable<V>() {
                @Override
                public V call() throws Exception {
                    return callable.call();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

    public interface PriorityRunnable extends Runnable, HasPriority {

        public static PriorityRunnable of(Runnable runnable, int priority) {
            return new PriorityRunnable() {
                @Override
                public void run() {
                    runnable.run();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

}
Spectrograph answered 16/3, 2017 at 10:20 Comment(1)
Update: it is not working together with Guava's listeningDecorator.... :(Ellette
C
0

Would it be possible to have one ThreadPoolExecutor for each level of priority? A ThreadPoolExecutor can be instanciated with a ThreadFactory and you could have your own implementation of a ThreadFactory to set the different priority levels.

 class MaxPriorityThreadFactory implements ThreadFactory {
     public Thread newThread(Runnable r) {
         Thread thread = new Thread(r);
         thread.setPriority(Thread.MAX_PRIORITY);
     }
 }
Chelate answered 30/4, 2009 at 14:42 Comment(5)
Thread priority isn't really important to me here; the tasks themselves will tend to execute reasonably quickly (the goal is to get them to ~50ms each) so thread scheduling is less of an issue. It's the priority of the tasks relative to each other that is at issue here.Goatish
Do they have to be executed in a certain order?Chelate
There's no one, true order, no, but tasks that arrive later but are of a higher priority should execute earlier than tasks that arrive later, but are of a lower priority. Again, with some guarantee of fairness to prevent starvation.Goatish
Well submitting them through a PriorityQueue ought to make they sure they get executed in the right order so I dont quite follow why you cannot do that.Chelate
The thread pool executor service does not support prioritization of tasks, that's why. You don't get direct control of what type of object is placed on the queue.Goatish

© 2022 - 2024 — McMap. All rights reserved.