How to wait for list of `Future`s created using different `ExecutorServices`
Asked Answered
F

5

12

Ok, so I know the first answer / comment here will be "use one ExecutorService and use invokeAll". However, there is a good reason (which I will not bore people with) for us keeping the thread pools separate.

So I have a list of thread pools (ExecutorServices) and what I need to do is invoke a different Callable on each thread pool using submit (no problem there). Now I have this collection of Future instances, each created on a seperate ExecutorService, and I want to wait for all of them to complete (and be able to provide a timeout at which any not done are cancelled).

Is there an existing class that will do this (wrap a list of Future instances and allow for a wait till all are done)? If not, suggestions on an efficient mechanism would be appreciated.

Was thinking of calling get with a timeout for each but have to do a calculation of the total time passed for each call.

I saw this post Wait Until Any of Future is Done but this extends Future instead of wrapping a list of them.

Flavory answered 1/11, 2012 at 18:55 Comment(9)
for each that list and when the last one is done its done - as they sayPegboard
Updated post to make it clear that the timeout is the thing that is the issue and I am hoping to not reinvent the wheel in that regard.Flavory
well if you read those nifty lil' java docs you will see that you can AwaitTermination and set the timeout then when you submit your tasks you can send it a shutdown and if the task dont complete in time it ill shutdown after ur timeout wabamzorz!!!!Pegboard
Where in his question did he say that the Executor will be shutdown?Feuillant
You could do this if these were Guava ListeningExecutorServices...Upholsterer
@TylerHeiks Thanks for the insulting comment. If you will read my post I specifically stated that I was using Futures produced from multiple ExecutorServicesFlavory
@LouisWasserman The Futures are produces using a set of services (not a single executor service)Flavory
Eh. If you can wrap each individual service in MoreExecutors.listeningDecorator(service), you can turn them all into ListeningExecutorServices. Then you could do Futures.allAsList to create a "joined" future.Upholsterer
@LouisWasserman You are the MAN! You pointed me in the right direction. Should have known Guava would have what I need. I actually need Futures.successfulAsListFlavory
F
17

Per Louis' comment, what I was looking for was Futures.successfulAsList

This allows me to wait for all to complete and then check for any futures that failed.

Guava RULES!

Flavory answered 2/11, 2012 at 9:55 Comment(0)
B
2

I don't think JDK provides a direct API that lets you do that. However, I think it is equally straightforward to create a simple method that does this. You might want to take a look at the implementation of AbstractExecutorService.invokeAll() to get an idea that this can be done.

Essentially, you would call future.get() on each future, decreasing the wait time by the time it took to wait for the result each time, and before returning from the method cancel all outstanding futures.

Balbur answered 1/11, 2012 at 20:1 Comment(2)
+1 Concur. I had thought of that and included it in my post as the obvious mechanism. I was hoping there was a better / already implemented mechanism and someone else knew of.Flavory
That's probably as good an implementation as you can get. It would have been straightforward for the JDK creators to provide that as a stand-alone (static) method. Another implementation might involve using a completion service and waiting in the order the tasks are completed. However, code becomes bit more involved with little additional benefit.Balbur
M
1

Maybe I didn't really get it. However, to me it still sounds as simple as

public <V> List<V> get(List<Future<V>> futures, long timeout, TimeUnit unit)
          throws InterruptedException, ExecutionException, TimeoutException {
    List<V> result = new ArrayList<V>();
    long end = System.nanoTime() + unit.toNanos(timeout);
    for (Future<V> f: futures) {
        result.add(f.get(end - System.nanoTime(), TimeUnit.NANOSECONDS));
    }
    return result;
}

Am I wrong with that?

The question you link is much more complex I think, as they only want to wait for the fastest, and of course have no idea which will be the fastest.

Marcille answered 1/11, 2012 at 20:14 Comment(1)
+1 The issue with this is that get can throw an exception. Changing it to return the Futures after they are all done would help. Also, it does not handle the shutdown. This is the obvious implementation, I was wanting to see if there was something cleaner / already implemented as tested.Flavory
S
1

This could use some cleanup, but it should solve your problem. (Some encapsulation omitted for time and space):

public static <T> LatchWithWrappedCallables<T> wrapCallables(Collection<Callable<T>> callablesToWrap)
{
    CountDownLatch latch = new CountDownLatch(callablesToWrap.size());
    List<Callable<T>> wrapped = new ArrayList<Callable<T>>(callablesToWrap.size());
    for (Callable<T> currCallable : callablesToWrap)
    {
        wrapped.add(new CallableCountdownWrapper<T>(currCallable, latch));
    }

    LatchWithWrappedCallables<T> returnVal = new LatchWithWrappedCallables<T>();
    returnVal.latch = latch;
    returnVal.wrappedCallables = wrapped;
    return returnVal;
}

public static class LatchWithWrappedCallables<T>
{
    public CountDownLatch latch;
    public Collection<Callable<T>> wrappedCallables;
}

public static class CallableCountdownWrapper<T> implements Callable<T>
{
    private final Callable<T> wrapped;

    private final CountDownLatch latch;

    public CallableCountdownWrapper(Callable<T> wrapped, CountDownLatch latch)
    {
        this.wrapped = wrapped;
        this.latch = latch;
    }

    @Override
    public T call() throws Exception
    {
        try
        {
            return wrapped.call();
        }
        finally
        {
            latch.countDown();
        }
    }
}

Then your code would call it like this:

Collection<Callable<String>> callablesToWrap = [Your callables that you need to wait for here];
LatchWithWrappedCallables<String> latchAndCallables = wrapCallables(callablesToWrap);

[Submit the wrapped callables to the executors here]

if(latchAndCallables.latch.await(timeToWaitInSec, TimeUnit.SECONDS))
{
    [Handling for timeout here]
}
Sonasonant answered 1/11, 2012 at 20:45 Comment(4)
And on second thought, I think that I would probably roll the await directly into the LatchWithWrappedCallables class to make it even easier. (And yes, "LatchWithWrappedCallables" is a terrible name!)Sonasonant
I will give this some deeper thought tomorrow to see if it fits my needs. Thanks.Flavory
Nice. I need to look at the ListenableFuture thing too.Flavory
Turns out Futures.successfulAsList is what I was looking forFlavory
G
0

I doubt you need list of Futures. Such solution is memory wasteful and unnecessarily complex. What you have is a relatively typical multiple producer - single consumer situation. Such scenarios are generally solved with blocking queues. Fact that you have multiple executors doesn't change much.

All you need is blocking dequeue, 2-sided queue; Java offer LinkedBlockingDeque. LinkedBlockingDeque is indeed thread safe collection and optimized for heavy concurrent usage.

Create one LinkedBlockingDeque and distribute reference to it to all your producers and consumers. Producers are going to push messages on one end, and consumer(s) are going to pull messages on the other end. Use simple executor.execute(() -> { ...; add processed item to queue}) and not executor.submit().

You can look at LinkedBlockingDeque as a variant of Future, but for multiple values, multiple get()s.

Gilmore answered 21/5 at 15:32 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.