How to await a list of ListenableFuture with a timeout
Asked Answered
M

2

10

I'm working on a problem where I have a List<ListenableFuture<T>>. I would like to aggregate the results of all of these futures into a List<T> with a timeout. The naive approach would be something like:

List<T> blockForResponses(List<ListenableFuture<T>> futures, long timeoutMillis) {
    return futures.stream()
        .map(future -> future.get(timeoutMillis,TimeUnit.MILLISECONDS)
        .collect(Collectors.toList());
}

This doesn't work because it waits for the timeout for each future and I want that to be the timeout for the entire list. Manually keeping track of how much time has passed also doesn't work because if the first one times out, there won't be any time left to try the others.

The solution I'm looking for would enforce a timeout on all of the futures and return when the timeout had elapsed or all of the futures in the list were complete. Then I could inspect each future in the list myself to aggregate the results and check which ones timed out.

Macswan answered 20/12, 2018 at 21:44 Comment(9)
Futures.allAsList(futures).get(timeoutMillis, TimeUnit.MILLISECONDS)? Or successfulAsList, depending on your intended semantics?Pessimism
@Louis won't those result in the future failing if the time out is exceeded? Then I won't be able to get the successful resultsMacswan
In other words, the get call on the Future returned by allAsList will throw a TimeoutExceptionMacswan
I think successfulAsList would work, thenWinner
@Roy I think that will have the same problem. successfulAsList returns a ListenableFuture which, when get is called with a timeout, throws an exception when the timeout expires.Macswan
I believe Daniel is correct. You might get away with successfulAsList after using withTimeout on all the futures?Pessimism
@Louis but then do I need to spin up a new Exexcutor to do this?Macswan
Not necessarily, but you'd need a ScheduledExecutorService.Pessimism
@Louis I haven't been able to figure out from what's available online. Does the timeout passed in withTimeout start when it's called or when get is called? If the former, that seems like a good answer that I'd love for you to write-up so I could accept it.Macswan
M
9

This problem turned out to be simpler than I thought. I was able to use the Futures.allAsList method and then catch the TimeoutException:

List<T> blockForResponses(List<ListenableFuture<T>> futures, long timeoutMillis) {
    ListenableFuture<List<T>> futureOfList = Futures.allAsList(futures);
    List<T> responses;
    try {
        responses = futureOfList.get(timeoutMillis, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
        responses = new ArrayList<>();
        for (ListenableFuture<T> future : futures) {
            if (future.isDone()) {
                responses.add(Uninterruptibles.getUninterruptibly(future));
            }
        }
    }
    return responses;
}
Macswan answered 21/12, 2018 at 15:5 Comment(0)
W
0

So I fiddled around a bit (Haven't used Guava's Listenable interface until this evening) but I think this may work for you: package basic;

import static com.google.common.util.concurrent.Futures.catching;
import static com.google.common.util.concurrent.Futures.successfulAsList;
import static com.google.common.util.concurrent.Futures.withTimeout;

import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

public class FuturesExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ListeningScheduledExecutorService scheduledExecutorService = MoreExecutors
                .listeningDecorator(Executors.newScheduledThreadPool(20));
        List<ListenableFuture<Integer>> list = new LinkedList<>();
        for (int i = 1; i <= 4; i++) {
            list.add(catching(
                    withTimeout(scheduledExecutorService.submit(getCallback(i * 1000, i)), 3, TimeUnit.SECONDS,
                            scheduledExecutorService),
                    TimeoutException.class, exception -> 0, scheduledExecutorService));
        }
        ListenableFuture<List<Integer>> result = successfulAsList(list);
        Optional<Integer> sum = result.get().stream().reduce(Integer::sum);
        System.out.println(sum.orElse(-1));
        scheduledExecutorService.shutdownNow();
    }

    private static Callable<Integer> getCallback(int timeout, int value) {
        return () -> {
            Thread.sleep(timeout);
            return value;
        };
    }
}

edit: code is a bit cleaner when using static imports for Futures

Winner answered 20/12, 2018 at 23:57 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.