How do I use CompletableFuture.supplyAsync together with PriorityBlockingQueue?
Asked Answered
T

1

7

I'm trying to add a priority queue to an existing application that uses ThreadPoolExecutor with a LinkedBlockingQueue via CompletableFuture.supplyAsync. The problem is that I can't come up with a design to assign task priorities that I can then access in PriorityBlockingQueue's Comparator. That is because my task gets wrapped up by CompletableFuture into an instance of a private inner class called AsyncSupply that hides the original task in a private field. The Comparator then gets called with these AsyncSupply objects casted as Runnables, as follows:

public class PriorityComparator<T extends Runnable> implements Comparator<T> {

    @Override
    public int compare(T o1, T o2) {

        // T is an AsyncSupply object.
        // BUT I WANT SOMETHING I CAN ASSIGN PRIORITIES TOO!
        return 0;
    }
}

I investigated the possibility of extending CompletableFuture so I can wrap it in a different object but so much of much of CompletableFuture is encapsulated and uninheritable. So extending it doesn't seem like an option. Nor is encapsulating it withing an adapter, as it implements a very wide interface.

I'm not sure how to approach this problem aside from copying the entire CompletableFuture, and modifying it. Any ideas?

Tzar answered 19/1, 2016 at 0:50 Comment(2)
I don't understand your question. You are calling asyncSupply() and passing an Executor that was configured with a LinkedBlockingQueue? And you want to replace that with PriorityBlockingQueue? That is, it seems like all of this is external to the the supplyAsync() function and CompletableFuture. Where are you having trouble? Perhaps you could show the code that you'd like to write, with the missing bit in the middle highlighted.Tabethatabib
Thanks @Tabethatabib for replying, and asking a good question. I did leave out the important point about why I can't just use the PriorityBlockingQueue. So the reason is that PriorityBlockingQueue will pass to the Comparator:compare method two AsyncSupply objects casted as Runnables. This AsyncSupply is a private inner class of CompletableFuture, and so I can't cast it back to AsyncSupply, much less access its private field which references the original unwrapped task that I would like to assign priorities to. So the question I guess is: How or where do I assign my priorities? I'll add some code.Tzar
A
8

It seems like a limitation in the API that CompletableFuture doesn't provide a straightforward way to use PriorityBlockingQueue. Fortunately, we can hack around it without too much trouble. In Oracle's 1.8 JVM, they happen to name all of the inner classes' fields fn, so extracting our priority-aware Runnables can be done without too much trouble:

public class CFRunnableComparator implements Comparator<Runnable> {

    @Override
    @SuppressWarnings("unchecked")
    public int compare(Runnable r1, Runnable r2) {
        // T might be AsyncSupply, UniApply, etc., but we want to
        // compare our original Runnables.
        return ((Comparable) unwrap(r1)).compareTo(unwrap(r2));
    }

    private Object unwrap(Runnable r) {
        try {
            Field field = r.getClass().getDeclaredField("fn");
            field.setAccessible(true);
            // NB: For performance-intensive contexts, you may want to
            // cache these in a ConcurrentHashMap<Class<?>, Field>.
            return field.get(r);
        } catch (IllegalAccessException | NoSuchFieldException e) {
            throw new IllegalArgumentException("Couldn't unwrap " + r, e);
        }
    }
}

This assumes that your Supplier class is Comparable, something like:

public interface WithPriority extends Comparable<WithPriority> {
    int priority();
    @Override
    default int compareTo(WithPriority o) {
        // Reverse comparison so higher priority comes first.
        return Integer.compare(o.priority(), priority());
    }
}

public class PrioritySupplier<T> implements Supplier<T>, WithPriority {
    private final int priority;
    private final Supplier<T> supplier;
    public PrioritySupplier(int priority, Supplier<T> supplier) {
        this.priority = priority;
        this.supplier = supplier;
    }
    @Override
    public T get() {
        return supplier.get();
    }
    @Override
    public int priority() {
        return priority;
    }
}

Used as follows:

PriorityBlockingQueue<Runnable> q = new PriorityBlockingQueue<>(11 /*default*/,
        new CFRunnableComparator());
ThreadPoolExecutor pool = new ThreadPoolExecutor(..., q);
CompletableFuture.supplyAsync(new PrioritySupplier<>(n, () -> {
    ...
}), pool);

If you create classes like PriorityFunction and PriorityBiConsumer, you can use this same technique to call methods like thenApplyAsync and whenCompleteAsync with appropriate priorities as well.

Angadreme answered 28/1, 2016 at 14:52 Comment(1)
Sorry for the delayed response, and thanks very much. I did end up doing what you said, and pulled out my own implementations for the supplier and comparator.Tzar

© 2022 - 2024 — McMap. All rights reserved.