Java Executors: how can I set task priority?
Asked Answered
M

8

46

Is there a possibility to set priority to tasks which are executed by Executors? I've found some statements in JCIP about it's possible but I cannot find any example and I cannot find anything related in docs.

From JCIP:

An execution policy specifies the "what, where, when, and how" of task execution, including:

  • ...
  • In what order should tasks be executed (FIFO, LIFO, priority order)?
  • ...

UPD: I realized that I asked not exactly what I wanted to ask. What I really wanted is:

How to use/emulate setting threads priority (i.e. what was thread.setPriority()) with executors framework?

Magma answered 7/7, 2010 at 20:46 Comment(0)
T
76

Currently the only concrete implementations of the Executor interface are the ThreadPoolExecutor and the ScheduledThreadpoolExecutor

Instead of using the utility / factory class Executors, you should create an instance using a constructor.

You can pass a BlockingQueue to the constructors of the ThreadPoolExecutor.

One of the implementations of the BlockingQueue, the PriorityBlockingQueue lets you pass a Comparator to a constructor, that way enabling you to decide the order of execution.

Tisza answered 7/7, 2010 at 21:24 Comment(3)
+1 PriorityBlockingQueue is the way to go. You can implement a Comparator or make the tasks themselves Comparable.Guarino
This article is a great reference : binkley.blogspot.fr/2009/04/jumping-work-queue-in-executor.htmlTake
My solution orders tasks by priority, but preserving submit order at same priority levels: https://mcmap.net/q/362597/-how-do-i-implement-task-prioritization-using-an-executorservice-in-java-5Raspberry
E
53

The idea here is to use a PriorityBlockingQueue in the executor. For this:

  • Create a comparator that would compare our futures.
  • Create a proxy for the Future to hold a priority.
  • Override 'newTaskFor' in order to wrap every future in our proxy.

First you need to hold priority on your future:

    class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get();
    }

    public void run() {
        src.run();
    }
}

Next you need to define comparator that would correctly sort the priority futures:

class PriorityFutureComparator implements Comparator<Runnable> {
    public int compare(Runnable o1, Runnable o2) {
        if (o1 == null && o2 == null)
            return 0;
        else if (o1 == null)
            return -1;
        else if (o2 == null)
            return 1;
        else {
            int p1 = ((PriorityFuture<?>) o1).getPriority();
            int p2 = ((PriorityFuture<?>) o2).getPriority();

            return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
        }
    }
}

Next let's assume we have a lengthy job like this:

class LenthyJob implements Callable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

Then in order to execute these jobs in priority the code will look like:

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int nThreads = 2;
        int qInitialSize = 10;

        ExecutorService exec = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
                new PriorityBlockingQueue<Runnable>(qInitialSize, new PriorityFutureComparator())) {

            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
                RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
                return new PriorityFuture<T>(newTaskFor, ((LenthyJob) callable).getPriority());
            }
        };

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

This is a lot of code but that's nearly the only way this can be accomplished.

On my machine the output is like the following:

Scheduling: 39
Scheduling: 90
Scheduling: 88
Executing: 39
Scheduling: 75
Executing: 90
Scheduling: 15
Scheduling: 2
Scheduling: 5
Scheduling: 24
Scheduling: 82
Scheduling: 81
Scheduling: 3
Scheduling: 23
Scheduling: 7
Scheduling: 40
Scheduling: 77
Scheduling: 49
Scheduling: 34
Scheduling: 22
Scheduling: 97
Scheduling: 33
Executing: 2
Executing: 3
Executing: 5
Executing: 7
Executing: 15
Executing: 22
Executing: 23
Executing: 24
Executing: 33
Executing: 34
Executing: 40
Executing: 49
Executing: 75
Executing: 77
Executing: 81
Executing: 82
Executing: 88
Executing: 97
Eous answered 16/5, 2013 at 1:23 Comment(5)
While the accepted answer does answer the question this one provides a working solution. Thanks a lot.Theona
Thanks for your answer. Would it be possible to use this approach with ExecutorCompletionService? I tried to pass in the your ExecutorService object in the ExecutorCompletionService constructor but the result cannot be casted to PriorityFuture in the comparator.Rina
I tested on my machine. It is not correct. On my machine, I got Executing 72 before Executing 3, which is plainly wrong.Neddy
The cleanest solution I could find for Callable Jobs, Thank you.Pure
Hi, i am encountering with type cast error while using the above code.... int p1 = ((PriorityFuture<?>) o1).getPriority(); java.util.concurrent.FutureTask cannot be cast to com.i2c.loyalty.handlers.datasynchandler.utils.InstancePriorityFuture this InstancePriorityFuture implements RunnableFuture can someone assist please?Recuperative
F
4

You can implement your own ThreadFactory and set it within ThreadPoolExecutor like this:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, numOfWorkerThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
threadPool.setThreadFactory(new OpJobThreadFactory(Thread.NORM_PRIORITY-2));

where my OpJobThreadFactory looks like the following:

public final static class OpJobThreadFactory implements ThreadFactory {
   private int priority;
   private boolean daemon;
   private final String namePrefix;
   private static final AtomicInteger poolNumber = new AtomicInteger(1);
   private final AtomicInteger threadNumber = new AtomicInteger(1);

   public OpJobThreadFactory(int priority) {
      this(priority, true);
   }

   public OpJobThreadFactory(int priority, boolean daemon) {
      this.priority = priority;
      this.daemon = daemon;
      namePrefix = "jobpool-" +poolNumber.getAndIncrement() + "-thread-";
   }

   @Override
   public Thread newThread(Runnable r) {
      Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
      t.setDaemon(daemon);
      t.setPriority(priority);
      return t;
   }
}
Fachan answered 18/9, 2013 at 10:8 Comment(0)
C
3

you can use ThreadPoolExecutor with Priority blocking queue How to implement PriorityBlockingQueue with ThreadPoolExecutor and custom tasks

Cataphoresis answered 22/4, 2011 at 11:38 Comment(0)
C
3

You can specify a ThreadFactory in the ThreadPoolExecutor constructor (or Executors factory method). This allows you to provide threads of a given thread priority for the executor.

To get different thread priorities for different jobs, you'd need to send them to executors with different thread factories.

Cerallua answered 11/1, 2013 at 21:46 Comment(0)
M
1

Please be aware that setPriority(..) normally does not work under Linux. See the following links for the full details:

Macrogamete answered 25/6, 2012 at 6:55 Comment(2)
Comments are comments; answers are answers. Comments are not answers. Answers are not comments. If it does not answer the question being asked, it is, in fact, a comment.Undermanned
+1 @Nick - Ha ha, love it! Why use one word when you could use a lengthy, sarky comment. Good point and well (cheekily) made.Pinkham
T
0

Just want to add my bit of contribution to this discussion. I've implemented this ReorderingThreadPoolExecutor for a very specific purpose, which is being able to explicitly bring to the front of the executor's BlockingQueue (in this case LinkedBlockingDeque) whenever I want and without having to deal with priorities (which can lead to deadlocks and are, anyway, fixed).

I'm using this to manage (inside an Android app) the case where I have to download many images that are displayed in a long list view. Whenever the user scrolls down quickly, the executor queue gets flooded of image download requests: by moving the latest ones on the top of the queue, I have achieved much better performances in loading the images which are actually on the screen, delaying the download of the ones that will probably needed later. Note that I use an internal concurrent map key (which can be as simple as the image URL string) to add the tasks to the executor so that I can retrieve them later for the reordering.

There'd have been many other ways of doing the same and maybe it's overcomplicated, but it works fine and also Facebook in his Android SDK is doing something similar in its own working threads queue.

Feel free to have a look at the code and give me suggestions, it's inside an Android project but stripping a few logs and annotations would make the class pure Java 6.

Teacher answered 27/6, 2013 at 13:50 Comment(0)
C
-1

If it is simply a case of trying to favour one thread over another rather than guaranteeing any order. On the Callable you pass in set Thread priority, at beginning of call() method:

private int priority;

MyCallable(int priority){
this.priority=priority;
}

public String call() {

     logger.info("running callable with priority {}", priority);
     Thread.currentThread().setPriority(priority);

// do stuff

     return "something";
}

still dependent on the underlying implementation to honour Thread priority though

Circumference answered 16/3, 2021 at 20:2 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.