Waiting on a list of Future
Asked Answered
A

15

223

I have a method which returns a List of futures

List<Future<O>> futures = getFutures();

Now I want to wait until either all futures are done processing successfully or any of the tasks whose output is returned by a future throws an exception. Even if one task throws an exception, there is no point in waiting for the other futures.

Simple approach would be to

wait() {

   For(Future f : futures) {
     try {
       f.get();
     } catch(Exception e) {
       //TODO catch specific exception
       // this future threw exception , means somone could not do its task
       return;
     }
   }
}

But the problem here is if, for example, the 4th future throws an exception, then I will wait unnecessarily for the first 3 futures to be available.

How to solve this? Will count down latch help in any way? I'm unable to use Future isDone because the java doc says

boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
Aardvark answered 13/10, 2013 at 17:42 Comment(9)
who generates those futures? What type are they of? Interface java.util.concurrent.Future does not provide the functionality you want, the only way is to use your own Futures with callbacks.Factoring
You could make an instance of ExecutionService for every "batch" of tasks, submit them to it, then immediately shut down the service and use awaitTermination() on it I suppose.Collinsworth
You could use a CountDownLatch if you wrapped the body of all your futures in a try..finally to make sure the latch gets decremented as well.Collinsworth
docs.oracle.com/javase/7/docs/api/java/util/concurrent/… does exactly what you need.Brut
@AlexeiKaigorodov YES , my future are of type java.util.concurrent.I am suing future with callable.I get Futture when i submit a task to a execureserviceAardvark
@Collinsworth if i wrap the code which returns every single feaure inside try cath finally it will work , but i wont be able to distinguish between successful finish of future and one with exceptionAardvark
@Aardvark I said try..finally, not try..catch. The exception will be rethrown and Future.get() would throw. It'd just make sure the latch gets counted down. That said, the suggestions to use CompletionService are clearly superior.Collinsworth
Correction: the exception will still be thrown not rethrown.Collinsworth
Possible duplicate of Waiting on multiple threads to complete in JavaDossier
L
170

You can use a CompletionService to receive the futures as soon as they are ready and if one of them throws an exception cancel the processing. Something like this:

Executor executor = Executors.newFixedThreadPool(4);
CompletionService<SomeResult> completionService = 
       new ExecutorCompletionService<SomeResult>(executor);

//4 tasks
for(int i = 0; i < 4; i++) {
   completionService.submit(new Callable<SomeResult>() {
       public SomeResult call() {
           ...
           return result;
       }
   });
}

int received = 0;
boolean errors = false;

while(received < 4 && !errors) {
      Future<SomeResult> resultFuture = completionService.take(); //blocks if none available
      try {
         SomeResult result = resultFuture.get();
         received ++;
         ... // do something with the result
      }
      catch(Exception e) {
             //log
         errors = true;
      }
}

I think you can further improve to cancel any still executing tasks if one of them throws an error.

Leafage answered 13/10, 2013 at 18:0 Comment(7)
:Your code has same issue which i mentioned in my post.If forth future throws exception then the code will still wait for future 1,2,3 to complete. or will completionSerice.take) will return the future which completes first?Aardvark
What about timeouts?Can i tell completion service to wait for X seconds at max?Aardvark
Should not have. It does not iterate over the futures, but as soon as one is ready it is processed/verified if not thrown exception.Leafage
To timeout waiting for a future to appear on the queue there is a poll(seconds) method on the CompletionService.Leafage
Here is the working example on github:github.com/princegoyal1987/FutureDemoVolant
@Volant Your code doesn't address the asked question and looks like copied from javarevisited.blogspot.com/2015/01/…. Can you please polish the code to avoid an empty click?Flute
If you are using a global thread pool then this will not work. I dunno if it's a good idea to use local (non singleton) thread pools.Hanoi
X
163

If you are using Java 8 then you can do this easier with CompletableFuture and CompletableFuture.allOf, which applies the callback only after all supplied CompletableFutures are done.

// Waits for *all* futures to complete and returns a list of results.
// If *any* future completes exceptionally then the resulting future will also complete exceptionally.

public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
    CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);

    return CompletableFuture.allOf(cfs)
            .thenApply(ignored -> futures.stream()
                                    .map(CompletableFuture::join)
                                    .collect(Collectors.toList())
            );
}
Xerarch answered 28/3, 2016 at 11:55 Comment(9)
Hi @Andrejs, could you please explain what this snippet of code does. I see this suggested in multiple places but am confused as to what is actually happening. How are exceptions handled if one of the threads fail?Delta
@Delta From the javadoc: If any of the given CompletableFutures complete exceptionally, then the returned CompletableFuture also does so, with a CompletionException holding this exception as its cause.Xerarch
Right so I was following up on that, is there any way to use this snippet but obtain the values for all the other threads which did complete successfully? Should I just iterate over the CompletableFutures list and call get ignoring the CompletableFuture<List<T>> since the sequence function takes care of ensuring all the threads are complete either with result or exception?Delta
This is solving a different problem. If you have Future instances, you can't apply this method. It's not easy to convert Future into CompletableFuture.Gazehound
it will not work if we have exception in some task.Incestuous
The biggest question here Why do they not put all this logic into CompletableFuture.allOf and make it return T instead of void? Why I have to copy&paste this into dozenb projects? Scala have Future.sequence why not java? :'(Tantalizing
@Xerarch what is the point of CompletableFuture.allOf here? Couldn't we just use CompletableFuture.supplyAsync to wrap the join stream? Is it just a matter of not using an extra thread?Oatmeal
@Oatmeal That would work though an extra thread will be blocked until all the futures complete. Might take a while.Xerarch
It seems you don't get a failure reported until all the futures are done. This means you can't use it to detect one future failing early and then cancelling all the other futures.Legge
B
91

Use a CompletableFuture in Java 8

    // Kick of multiple, asynchronous lookups
    CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1");
    CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2");
    CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3");

    // Wait until they are all done
    CompletableFuture.allOf(page1,page2,page3).join();

    logger.info("--> " + page1.get());
Bobbitt answered 24/6, 2018 at 4:3 Comment(3)
This should be the accepted answer. Also it's part of the official Spring documentation: spring.io/guides/gs/async-methodNoticeable
how could you cancel any future that takes longer than 2 seconds? using .get would "stack up" like this: #17434811Either
@Noticeable question does not specify spring, nor the answer shows how to build such completable future. Executor interface specifies a regular Future<T> in submit method.Luetic
Y
19

You can use an ExecutorCompletionService. The documentation even has an example for your exact use-case:

Suppose instead that you would like to use the first non-null result of the set of tasks, ignoring any that encounter exceptions, and cancelling all other tasks when the first one is ready:

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    int n = solvers.size();
    List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
    Result result = null;
    try {
        for (Callable<Result> s : solvers)
            futures.add(ecs.submit(s));
        for (int i = 0; i < n; ++i) {
            try {
                Result r = ecs.take().get();
                if (r != null) {
                    result = r;
                    break;
                }
            } catch (ExecutionException ignore) {
            }
        }
    } finally {
        for (Future<Result> f : futures)
            f.cancel(true);
    }

    if (result != null)
        use(result);
}

The important thing to notice here is that ecs.take() will get the first completed task, not just the first submitted one. Thus you should get them in the order of finishing the execution (or throwing an exception).

Yseult answered 13/10, 2013 at 18:45 Comment(0)
P
3

If you are using Java 8 and don't want to manipulate CompletableFutures, I have written a tool to retrieve results for a List<Future<T>> using streaming. The key is that you are forbidden to map(Future::get) as it throws.

public final class Futures
{

    private Futures()
    {}

    public static <E> Collector<Future<E>, Collection<E>, List<E>> present()
    {
        return new FutureCollector<>();
    }

    private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>>
    {
        private final List<Throwable> exceptions = new LinkedList<>();

        @Override
        public Supplier<Collection<T>> supplier()
        {
            return LinkedList::new;
        }

        @Override
        public BiConsumer<Collection<T>, Future<T>> accumulator()
        {
            return (r, f) -> {
                try
                {
                    r.add(f.get());
                }
                catch (InterruptedException e)
                {}
                catch (ExecutionException e)
                {
                    exceptions.add(e.getCause());
                }
            };
        }

        @Override
        public BinaryOperator<Collection<T>> combiner()
        {
            return (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            };
        }

        @Override
        public Function<Collection<T>, List<T>> finisher()
        {
            return l -> {

                List<T> ret = new ArrayList<>(l);
                if (!exceptions.isEmpty())
                    throw new AggregateException(exceptions, ret);

                return ret;
            };

        }

        @Override
        public Set<java.util.stream.Collector.Characteristics> characteristics()
        {
            return java.util.Collections.emptySet();
        }
    }

This needs an AggregateException that works like C#'s

public class AggregateException extends RuntimeException
{
    /**
     *
     */
    private static final long serialVersionUID = -4477649337710077094L;

    private final List<Throwable> causes;
    private List<?> successfulElements;

    public AggregateException(List<Throwable> causes, List<?> l)
    {
        this.causes = causes;
        successfulElements = l;
    }

    public AggregateException(List<Throwable> causes)
    {
        this.causes = causes;
    }

    @Override
    public synchronized Throwable getCause()
    {
        return this;
    }

    public List<Throwable> getCauses()
    {
        return causes;
    }

    public List<?> getSuccessfulElements()
    {
        return successfulElements;
    }

    public void setSuccessfulElements(List<?> successfulElements)
    {
        this.successfulElements = successfulElements;
    }

}

This component acts exactly as C#'s Task.WaitAll. I am working on a variant that does the same as CompletableFuture.allOf (equivalento to Task.WhenAll)

The reason why I did this is that I am using Spring's ListenableFuture and don't want to port to CompletableFuture despite it is a more standard way

Panicstricken answered 7/3, 2017 at 14:7 Comment(2)
Upvote for seeing the need for a equivalent AggregateException.Kieger
An example of using this facility would be nice.Orcus
C
3

In case that you want combine a List of CompletableFutures, you can do this :

List<CompletableFuture<Void>> futures = new ArrayList<>();
// ... Add futures to this ArrayList of CompletableFutures

// CompletableFuture.allOf() method demand a variadic arguments
// You can use this syntax to pass a List instead
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[futures.size()]));

// Wait for all individual CompletableFuture to complete
// All individual CompletableFutures are executed in parallel
allFutures.get();

For more details on Future & CompletableFuture, useful links:
1. Future: https://www.baeldung.com/java-future
2. CompletableFuture: https://www.baeldung.com/java-completablefuture
3. CompletableFuture: https://www.callicoder.com/java-8-completablefuture-tutorial/

Chadwick answered 24/6, 2019 at 12:37 Comment(1)
Can use new CompletableFuture[0], .toArray doesn't reuse the passed arrayLinell
A
1

I've got a utility class that contains these:

@FunctionalInterface
public interface CheckedSupplier<X> {
  X get() throws Throwable;
}

public static <X> Supplier<X> uncheckedSupplier(final CheckedSupplier<X> supplier) {
    return () -> {
        try {
            return supplier.get();
        } catch (final Throwable checkedException) {
            throw new IllegalStateException(checkedException);
        }
    };
}

Once you have that, using a static import, you can simple wait for all futures like this:

futures.stream().forEach(future -> uncheckedSupplier(future::get).get());

you can also collect all their results like this:

List<MyResultType> results = futures.stream()
    .map(future -> uncheckedSupplier(future::get).get())
    .collect(Collectors.toList());

Just revisiting my old post and noticing that you had another grief:

But the problem here is if, for example, the 4th future throws an exception, then I will wait unnecessarily for the first 3 futures to be available.

In this case, the simple solution is to do this in parallel:

futures.stream().parallel()
 .forEach(future -> uncheckedSupplier(future::get).get());

This way the first exception, although it will not stop the future, will break the forEach-statement, like in the serial example, but since all wait in parallel, you won't have to wait for the first 3 to complete.

Abdicate answered 8/1, 2019 at 18:49 Comment(0)
T
1
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class Stack2 {   
    public static void waitFor(List<Future<?>> futures) {
        List<Future<?>> futureCopies = new ArrayList<Future<?>>(futures);//contains features for which status has not been completed
        while (!futureCopies.isEmpty()) {//worst case :all task worked without exception, then this method should wait for all tasks
            Iterator<Future<?>> futureCopiesIterator = futureCopies.iterator();
            while (futureCopiesIterator.hasNext()) {
                Future<?> future = futureCopiesIterator.next();
                if (future.isDone()) {//already done
                    futureCopiesIterator.remove();
                    try {
                        future.get();// no longer waiting
                    } catch (InterruptedException e) {
                        //ignore
                        //only happen when current Thread interrupted
                    } catch (ExecutionException e) {
                        Throwable throwable = e.getCause();// real cause of exception
                        futureCopies.forEach(f -> f.cancel(true));//cancel other tasks that not completed
                        return;
                    }
                }
            }
        }
    }
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        Runnable runnable1 = new Runnable (){
            public void run(){
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                }
            }
        };
        Runnable runnable2 = new Runnable (){
            public void run(){
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                }
            }
        };


        Runnable fail = new Runnable (){
            public void run(){
                try {
                    Thread.sleep(1000);
                    throw new RuntimeException("bla bla bla");
                } catch (InterruptedException e) {
                }
            }
        };

        List<Future<?>> futures = Stream.of(runnable1,fail,runnable2)
                .map(executorService::submit)
                .collect(Collectors.toList());

        double start = System.nanoTime();
        waitFor(futures);
        double end = (System.nanoTime()-start)/1e9;
        System.out.println(end +" seconds");

    }
}
Thundercloud answered 25/4, 2020 at 16:39 Comment(0)
O
0

maybe this would help (nothing would replaced with raw thread, yeah!) I suggest run each Future guy with a separated thread (they goes parallel), then when ever one of the got error, it just signal the manager(Handler class).

class Handler{
//...
private Thread thisThread;
private boolean failed=false;
private Thread[] trds;
public void waitFor(){
  thisThread=Thread.currentThread();
  List<Future<Object>> futures = getFutures();
  trds=new Thread[futures.size()];
  for (int i = 0; i < trds.length; i++) {
    RunTask rt=new RunTask(futures.get(i), this);
    trds[i]=new Thread(rt);
  }
  synchronized (this) {
    for(Thread tx:trds){
      tx.start();
    }  
  }
  for(Thread tx:trds){
    try {tx.join();
    } catch (InterruptedException e) {
      System.out.println("Job failed!");break;
    }
  }if(!failed){System.out.println("Job Done");}
}

private List<Future<Object>> getFutures() {
  return null;
}

public synchronized void cancelOther(){if(failed){return;}
  failed=true;
  for(Thread tx:trds){
    tx.stop();//Deprecated but works here like a boss
  }thisThread.interrupt();
}
//...
}
class RunTask implements Runnable{
private Future f;private Handler h;
public RunTask(Future f,Handler h){this.f=f;this.h=h;}
public void run(){
try{
f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation)
}catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();}
catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");}
}
}

I have to say the above code would error(didn't check), but I hope I could explain the solution. please have a try.

Olethea answered 13/10, 2013 at 19:25 Comment(0)
R
0

The CompletionService will take your Callables with the .submit() method and you can retrieve the computed futures with the .take() method.

One thing you must not forget is to terminate the ExecutorService by calling the .shutdown() method. Also you can only call this method when you have saved a reference to the executor service so make sure to keep one.

Example code - For a fixed number of work items to be worked on in parallel:

ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

CompletionService<YourCallableImplementor> completionService = 
new ExecutorCompletionService<YourCallableImplementor>(service);

ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>();

for (String computeMe : elementsToCompute) {
    futures.add(completionService.submit(new YourCallableImplementor(computeMe)));
}
//now retrieve the futures after computation (auto wait for it)
int received = 0;

while(received < elementsToCompute.size()) {
 Future<YourCallableImplementor> resultFuture = completionService.take(); 
 YourCallableImplementor result = resultFuture.get();
 received ++;
}
//important: shutdown your ExecutorService
service.shutdown();

Example code - For a dynamic number of work items to be worked on in parallel:

public void runIt(){
    ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    CompletionService<CallableImplementor> completionService = new ExecutorCompletionService<CallableImplementor>(service);
    ArrayList<Future<CallableImplementor>> futures = new ArrayList<Future<CallableImplementor>>();

    //Initial workload is 8 threads
    for (int i = 0; i < 9; i++) {
        futures.add(completionService.submit(write.new CallableImplementor()));             
    }
    boolean finished = false;
    while (!finished) {
        try {
            Future<CallableImplementor> resultFuture;
            resultFuture = completionService.take();
            CallableImplementor result = resultFuture.get();
            finished = doSomethingWith(result.getResult());
            result.setResult(null);
            result = null;
            resultFuture = null;
            //After work package has been finished create new work package and add it to futures
            futures.add(completionService.submit(write.new CallableImplementor()));
        } catch (InterruptedException | ExecutionException e) {
            //handle interrupted and assert correct thread / work packet count              
        } 
    }

    //important: shutdown your ExecutorService
    service.shutdown();
}

public class CallableImplementor implements Callable{
    boolean result;

    @Override
    public CallableImplementor call() throws Exception {
        //business logic goes here
        return this;
    }

    public boolean getResult() {
        return result;
    }

    public void setResult(boolean result) {
        this.result = result;
    }
}
Repentant answered 2/11, 2015 at 21:40 Comment(0)
D
0
 /**
     * execute suppliers as future tasks then wait / join for getting results
     * @param functors a supplier(s) to execute
     * @return a list of results
     */
    private List getResultsInFuture(Supplier<?>... functors) {
        CompletableFuture[] futures = stream(functors)
                .map(CompletableFuture::supplyAsync)
                .collect(Collectors.toList())
                .toArray(new CompletableFuture[functors.length]);
        CompletableFuture.allOf(futures).join();
        return stream(futures).map(a-> {
            try {
                return a.get();
            } catch (InterruptedException | ExecutionException e) {
                //logger.error("an error occurred during runtime execution a function",e);
                return null;
            }
        }).collect(Collectors.toList());
    };
Damoiselle answered 25/7, 2018 at 14:56 Comment(0)
S
0

This is what i use to wait for a certain time on a list of futures. I think its cleaner.

CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
// Some parallel work
        for (Something tp : somethings) {
            completionService.submit(() -> {
                try {
                   work(something)
                } catch (ConnectException e) {
                } finally {
                    countDownLatch.countDown();
                }
            });
        }    
  try {
        if (!countDownLatch.await(secondsToWait, TimeUnit.SECONDS)){
        }
    } catch (InterruptedException e) {
    }
Secrest answered 29/5, 2022 at 6:56 Comment(0)
P
0

A Guava-based solution can be implemented using Futures.FutureCombiner.

Here is the code example given in the javadoc:

 final ListenableFuture<Instant> loginDateFuture =
     loginService.findLastLoginDate(username);
 final ListenableFuture<List<String>> recentCommandsFuture =
     recentCommandsService.findRecentCommands(username);
 ListenableFuture<UsageHistory> usageFuture =
     Futures.whenAllSucceed(loginDateFuture, recentCommandsFuture)
         .call(
             () ->
                 new UsageHistory(
                     username,
                     Futures.getDone(loginDateFuture),
                     Futures.getDone(recentCommandsFuture)),
             executor);

For more info, see the ListenableFutureExplained section of the user's guide.

If you're curious about how it works under the hood, I suggest looking at this part of the source code: AggregateFuture.java#L127-L186

Pincus answered 13/10, 2022 at 13:36 Comment(0)
P
0

For anyone using Vavr's Future, you can either wait for all of them like this:

static <T> Optional<Future<T>> waitForAll(Collection<Future<T>> futures) {
  return futures.stream()
      .reduce((last, next) -> last.flatMap(ignored -> next));

Or, if you have a default value in case there are no future in the collection:

static <T> Future<T> waitForAll(Collection<Future<T>> futures, T defaultValue) {
  return futures.stream()
      .reduce(Future.successful(defaultValue), (last, next) -> last.flatMap(ignored -> next));
}

This will wait for all futures, however, regardless of whether one has failed.


To return as soon as any fails, change your accumulator function to:

(last, next) -> Future.firstCompletedOf(List.of(last, next))
    .flatMap(v -> last.flatMap(ignored -> next));

As we only have two items in our merge function, we can wait for either of them to complete (Vavr's firstCompletedOf). If it had failed, it will ignored flatMap and return the failed future. If it was successful (whichever it was), it falls into the first flatMap where we wait also wait for the other one to finish.

This works no matter how long the collection of futures is, because the accumulator essentially pairs them all up:

accumulate(accumulate(accumulate(1, 2), 3), 4)

where accumulate does the "wait for both unless one fails".

Warning: This will not stop the execution on the other threads.

Peripeteia answered 4/4, 2023 at 10:9 Comment(2)
Isn't stopping the execution on the other threads OP's question?Eudemonics
They only write that they don't want to wait; the threads may keep running in the background without any interrupts. Interrupting threads is a different issue.Peripeteia
B
0

Using an AutoClosable implementation it's trivial to wait for list of Futures to finish:

try (FutureQueue queue = new FutureQueue())
{
    queue.add(createFuture(...));
    queue.add(createFuture(...));
    queue.add(createFuture(...));
}

Simple helper class:

public class FutureQueue extends ArrayList<Future> implements AutoCloseable
{
    @Override
    public void close() throws Exception
    {
        for (Future f : this)
        {
            f.get();
        }
    }
}
Ballade answered 12/2, 2024 at 13:43 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.