Transform Java Future into a CompletableFuture
Asked Answered
T

8

137

Java 8 introduces CompletableFuture, a new implementation of Future that is composable (includes a bunch of thenXxx methods). I'd like to use this exclusively, but many of the libraries I want to use return only non-composable Future instances.

Is there a way to wrap up a returned Future instances inside of a CompleteableFuture so that I can compose it?

Tanked answered 25/4, 2014 at 19:38 Comment(0)
A
62

There is a way, but you won't like it. The following method transforms a Future<T> into a CompletableFuture<T>:

public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
  if (future.isDone())
    return transformDoneFuture(future);
  return CompletableFuture.supplyAsync(() -> {
    try {
      if (!future.isDone())
        awaitFutureIsDoneInForkJoinPool(future);
      return future.get();
    } catch (ExecutionException e) {
      throw new RuntimeException(e);
    } catch (InterruptedException e) {
      // Normally, this should never happen inside ForkJoinPool
      Thread.currentThread().interrupt();
      // Add the following statement if the future doesn't have side effects
      // future.cancel(true);
      throw new RuntimeException(e);
    }
  });
}

private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
  CompletableFuture<T> cf = new CompletableFuture<>();
  T result;
  try {
    result = future.get();
  } catch (Throwable ex) {
    cf.completeExceptionally(ex);
    return cf;
  }
  cf.complete(result);
  return cf;
}

private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
    throws InterruptedException {
  ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
    @Override public boolean block() throws InterruptedException {
      try {
        future.get();
      } catch (ExecutionException e) {
        throw new RuntimeException(e);
      }
      return true;
    }
    @Override public boolean isReleasable() {
      return future.isDone();
    }
  });
}

Obviously, the problem with this approach is, that for each Future, a thread will be blocked to wait for the result of the Future--contradicting the idea of futures. In some cases, it might be possible to do better. However, in general, there is no solution without actively wait for the result of the Future.

Around answered 25/4, 2014 at 20:20 Comment(8)
Ha, that's exactly what I wrote before thinking that there must be a better way. But, I guess notTanked
Hmmm... doesn't this solution eat one of the threads of the "common pool", just for waiting? Those "common pool" threads should never block... hmmmm...Varanasi
@Peti: You are right. However, the point is, if you are most likely doing something wrong, regardless whether you are using the common pool or an unbounded thread pool.Around
It might not be perfect, but using CompletableFuture.supplyAsync(supplier, new SinglethreadExecutor()) would at least not block the common pool threads.Overuse
Sorry for my ignorance, but I'm not familiar with the "common pool". Why is this approach bad? Are you saying that future.get should never be called?Clad
Please, just never do thatSelfpronouncing
The common pool is a thread pool with a small amount of threads (usually 1 per processor core) and it's one pool shared with the whole JVM, and it's meant for non-blocking operations. CompletableFuture.supplyAsync() runs in the common pool unless a different ExecutorService is passed as a parameter and future.get() is a blocking operation. So you should never do this.Knowable
@Overuse I think employing FJP.ManagedBlocker will result in better utilization than CompletableFuture.supplyAsync(supplier, t -> newSingleThreadExecutor()). I've updated the code in the answer to reflect that.Spontaneous
I
75

If the library you want to use also offers a callback style method in addition to the Future style, you can provide it a handler that completes the CompletableFuture without any extra thread blocking. Like so:

    AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file"));
    // ... 
    CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>();
    open.read(buffer, position, null, new CompletionHandler<Integer, Void>() {
        @Override
        public void completed(Integer result, Void attachment) {
            completableFuture.complete(buffer);
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            completableFuture.completeExceptionally(exc);
        }
    });
    completableFuture.thenApply(...)

Without the callback the only other way I see solving this is to use a polling loop that puts all your Future.isDone() checks on a single thread and then invoking complete whenever a Future is gettable.

Ical answered 11/2, 2015 at 12:46 Comment(1)
I am using Apache Http async library which accepts FutureCallback. It made my life easy :)Bille
D
70

If your Future is the result of a call to an ExecutorService method (e.g. submit()), the easiest would be to use the CompletableFuture.runAsync(Runnable, Executor) method instead.

From

Runnbale myTask = ... ;
Future<?> future = myExecutor.submit(myTask);

to

Runnbale myTask = ... ;
CompletableFuture<?> future = CompletableFuture.runAsync(myTask, myExecutor);

The CompletableFuture is then created "natively".

EDIT: Pursuing comments by @SamMefford corrected by @MartinAndersson, if you want to pass a Callable, you need to call supplyAsync(), converting the Callable<T> into a Supplier<T>, e.g. with:

CompletableFuture.supplyAsync(() -> {
    try { return myCallable.call(); }
    catch (Exception ex) { throw new CompletionException(ex); } // Or return default value
}, myExecutor);

Because T Callable.call() throws Exception; throws an exception and T Supplier.get(); doesn't, you have to catch the exception so prototypes are compatible.

A note on exception handling

The get() method doesn't specify a throws, which means it should not throw a checked exception. However, unchecked exception can be used. The code in CompletableFuture shows that CompletionException is used and is unchecked (i.e. is a RuntimeException), hence the catch/throw wrapping any exception into a CompletionException.

Also, as @WeGa indicated, you can use the handle() method to deal with exceptions potentially being thrown by the result:

CompletableFuture<T> future = CompletableFuture.supplyAsync(...);
future.handle((ex,res) -> {
        if (ex != null) {
            // An exception occurred ...
        } else {
            // No exception was thrown, 'res' is valid and can be handled here
        }
    });
Dane answered 9/4, 2020 at 8:57 Comment(3)
Or, if you're using Callable<T> rather than Runnable, instead try supplyAsync: CompletableFuture<T> future = CompletableFuture.supplyAsync(myCallable, myExecutor);Middlemost
supplyAsync receives a Supplier. The code will not compile if you attempt to pass in a Callable.Lavonlavona
You can also implement exception handling with java.util.concurrent.CompletableFuture#handle, chained right after CompletableFuture: CompletableFuture.supplyAsync(...).handle((response, throwable) -> ...)Kith
A
62

There is a way, but you won't like it. The following method transforms a Future<T> into a CompletableFuture<T>:

public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
  if (future.isDone())
    return transformDoneFuture(future);
  return CompletableFuture.supplyAsync(() -> {
    try {
      if (!future.isDone())
        awaitFutureIsDoneInForkJoinPool(future);
      return future.get();
    } catch (ExecutionException e) {
      throw new RuntimeException(e);
    } catch (InterruptedException e) {
      // Normally, this should never happen inside ForkJoinPool
      Thread.currentThread().interrupt();
      // Add the following statement if the future doesn't have side effects
      // future.cancel(true);
      throw new RuntimeException(e);
    }
  });
}

private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
  CompletableFuture<T> cf = new CompletableFuture<>();
  T result;
  try {
    result = future.get();
  } catch (Throwable ex) {
    cf.completeExceptionally(ex);
    return cf;
  }
  cf.complete(result);
  return cf;
}

private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
    throws InterruptedException {
  ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
    @Override public boolean block() throws InterruptedException {
      try {
        future.get();
      } catch (ExecutionException e) {
        throw new RuntimeException(e);
      }
      return true;
    }
    @Override public boolean isReleasable() {
      return future.isDone();
    }
  });
}

Obviously, the problem with this approach is, that for each Future, a thread will be blocked to wait for the result of the Future--contradicting the idea of futures. In some cases, it might be possible to do better. However, in general, there is no solution without actively wait for the result of the Future.

Around answered 25/4, 2014 at 20:20 Comment(8)
Ha, that's exactly what I wrote before thinking that there must be a better way. But, I guess notTanked
Hmmm... doesn't this solution eat one of the threads of the "common pool", just for waiting? Those "common pool" threads should never block... hmmmm...Varanasi
@Peti: You are right. However, the point is, if you are most likely doing something wrong, regardless whether you are using the common pool or an unbounded thread pool.Around
It might not be perfect, but using CompletableFuture.supplyAsync(supplier, new SinglethreadExecutor()) would at least not block the common pool threads.Overuse
Sorry for my ignorance, but I'm not familiar with the "common pool". Why is this approach bad? Are you saying that future.get should never be called?Clad
Please, just never do thatSelfpronouncing
The common pool is a thread pool with a small amount of threads (usually 1 per processor core) and it's one pool shared with the whole JVM, and it's meant for non-blocking operations. CompletableFuture.supplyAsync() runs in the common pool unless a different ExecutorService is passed as a parameter and future.get() is a blocking operation. So you should never do this.Knowable
@Overuse I think employing FJP.ManagedBlocker will result in better utilization than CompletableFuture.supplyAsync(supplier, t -> newSingleThreadExecutor()). I've updated the code in the answer to reflect that.Spontaneous
W
15

I published a little futurity project that tries to do better than the straightforward way in the answer.

The main idea is to use only one thread (and of course with not just a spin loop) to check all Futures states inside, which helps to avoid blocking a thread from a pool for each Future -> CompletableFuture transformation.

Usage example:

Future oldFuture = ...;
CompletableFuture profit = Futurity.shift(oldFuture);
Willetta answered 31/8, 2016 at 0:39 Comment(2)
This looks interesting. Is it using a timer thread ? How come this is not the accepted answer ?Dartmoor
@Dartmoor Yeah, it basically uses one timer thread to wait on all submitted futures.Willetta
S
9

Suggestion:

https://gabfssilva.github.io/old-java-future-to-completable-future/

But, basically:

public class CompletablePromiseContext {
    private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor();

    public static void schedule(Runnable r) {
        SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS);
    }
}

And, the CompletablePromise:

public class CompletablePromise<V> extends CompletableFuture<V> {
    private Future<V> future;

    public CompletablePromise(Future<V> future) {
        this.future = future;
        CompletablePromiseContext.schedule(this::tryToComplete);
    }

    private void tryToComplete() {
        if (future.isDone()) {
            try {
                complete(future.get());
            } catch (InterruptedException e) {
                completeExceptionally(e);
            } catch (ExecutionException e) {
                completeExceptionally(e.getCause());
            }
            return;
        }

        if (future.isCancelled()) {
            cancel(true);
            return;
        }

        CompletablePromiseContext.schedule(this::tryToComplete);
    }
}

Example:

public class Main {
    public static void main(String[] args) {
        final ExecutorService service = Executors.newSingleThreadExecutor();
        final Future<String> stringFuture = service.submit(() -> "success");
        final CompletableFuture<String> completableFuture = new CompletablePromise<>(stringFuture);

        completableFuture.whenComplete((result, failure) -> {
            System.out.println(result);
        });
    }
}
Spongin answered 26/4, 2018 at 3:55 Comment(1)
this is quite simple to reason about & elegant & fits most use cases. I would make the CompletablePromiseContext not-static and take param for the check interval (which is set to 1 ms here) then overload the CompletablePromise<V> constructor to be able to provide your own CompletablePromiseContext with a possibly different (longer) check interval for long-running Future<V> where you don't have to absolutely be able to run callback (or compose) immediately upon finishing, and you can also have an instance of CompletablePromiseContext to watch a set of Future (in case you have many)Erectile
C
6

Let me suggest another (hopefully, better) option: https://github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata/concurrent

Briefly, the idea is the following:

  1. Introduce CompletableTask<V> interface -- the union of the CompletionStage<V> + RunnableFuture<V>
  2. Warp ExecutorService to return CompletableTask from submit(...) methods (instead of Future<V>)
  3. Done, we have runnable AND composable Futures.

Implementation uses an alternative CompletionStage implementation (pay attention, CompletionStage rather than CompletableFuture):

Usage:

J8ExecutorService exec = J8Executors.newCachedThreadPool();
CompletionStage<String> = exec
   .submit( someCallableA )
   .thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b)
   .thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c); 
Camerlengo answered 19/5, 2016 at 13:18 Comment(1)
Small update: code is moved to separate project, github.com/vsilaev/tascalate-concurrent, and now it's possible to use out-of-the-way box Executor-s from java.util.concurrent.Camerlengo
B
4
public static <T> CompletableFuture<T> fromFuture(Future<T> f) {
    return CompletableFuture.completedFuture(null).thenCompose(avoid -> {
        try {
            return CompletableFuture.completedFuture(f.get());
        } catch (InterruptedException e) {
            return CompletableFuture.failedFuture(e);
        } catch (ExecutionException e) {
            return CompletableFuture.failedFuture(e.getCause());
        }
    });
}
Brandabrandais answered 24/10, 2021 at 23:16 Comment(0)
M
0

The main idea goes like this:

Future<?> future = null;
return CompletableFuture.supplyAsync(future::get);

However, you will receive some warnings from your compiler.

So, here is the first option.

Future<?> future = null;
return CompletableFuture.supplyAsync(
        ()->{
            try {
                return future.get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });

Second Option, hide the try...catch via casting the functional interface.

    @FunctionalInterface
    public interface MySupplier<T> extends Supplier<T> {
        @Override
        default T get() {
            try {
                return getInternal();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        T getInternal() throws Exception;
    }


    public static void main(String[] args) {
        Future<?> future = null;
        return CompletableFuture.supplyAsync((MySupplier<?>) future::get);

    }


Third Option, find out some 3rd party lib which has provided such a functional interface.

See Also: Java 8 Lambda function that throws exception?

Marable answered 30/7, 2022 at 15:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.