Asynchronously populating a Java Map and returning it as a future
Asked Answered
J

1

8

I've got a map of objects which are expensive to create, and so I want to create the objects and fill the map in parallel with other processes in my application. Only when the main thread actually needs to access the map should the application wait for the asynchronous task(s) populating the map to finish. How can I most elegantly accomplish this?

Current approach

Currently, I am able to create each individual object in the map itself asynchronously using CompletableFuture.runAsync(Runnable, Executor) analogously to in the example code below, but I'm unsure of how I can build a Future/CompletableFuture-type mechanism for returning the Map itself when ready:

public static class AsynchronousMapPopulator {

    private final Executor backgroundJobExecutor;

    public AsynchronousMapPopulator(final Executor backgroundJobExecutor) {
        this.backgroundJobExecutor = backgroundJobExecutor;
    }

    public ConcurrentMap<String, Integer> apply(final Map<String,Integer> input) {
        final ConcurrentMap<String, Integer> result = new ConcurrentHashMap<>(input.size());
        final Stream.Builder<CompletableFuture<Void>> incrementingJobs = Stream.builder();
        for (final Entry<String, Integer> entry : input.entrySet()) {
            final String className = entry.getKey();
            final Integer oldValue = entry.getValue();
            final CompletableFuture<Void> incrementingJob = CompletableFuture.runAsync(() -> {
                result.put(className, oldValue + 1);
            }, backgroundJobExecutor);
            incrementingJobs.add(incrementingJob);
        }
        // TODO: This blocks until the training is done; Instead, return a
        // future to the caller somehow
        CompletableFuture.allOf(incrementingJobs.build().toArray(CompletableFuture[]::new)).join();
        return result;
    }
}

However, with the code above, when code calls AsynchronousTest.create(Map<String,Integer), it already blocks until the method returns the entirely-populated ConcurrentMap<String,Integer>; How can I turn this into something like a Future<Map<String,Integer>> so that I can use it at a later time?:

Executor someExecutor = ForkJoinPool.commonPool();
Future<Map<String,Integer>> futureClassModels = new AsynchronousMapPopulator(someExecutor).apply(wordClassObservations);
...
// Do lots of other stuff
...
Map<String,Integer> completedModels = futureClassModels.get();
Jab answered 7/11, 2017 at 11:28 Comment(5)
I may have read things wrong here, but in case I did not - shouldn't you just submit this task of computing the expensive Map to a pool and return a Future from that?Oteliaotero
Each individual map value is very expensive to compute, so I need that to be paralllelized as well.Jab
If you don’t want to wait for the result, don’t call .join()Rebellion
@Rebellion that doesn't give me the map.Jab
return CompletableFuture.allOf(incrementingJobs.build().toArray( CompletableFuture[]::new)).thenApply(x -> result);Rebellion
Y
4

As @Holger states in his comment, you must avoid calling .join() and rely upon thenApply() instead, e.g. like this:

public static class AsynchronousMapPopulator {

    private final Executor backgroundJobExecutor;

    public AsynchronousMapPopulator(final Executor backgroundJobExecutor) {
        this.backgroundJobExecutor = backgroundJobExecutor;
    }

    public Future<Map<String, Integer>> apply(final Map<String,Integer> input) {
        final ConcurrentMap<String, Integer> result = new ConcurrentHashMap<>(input.size());
        final Stream.Builder<CompletableFuture<Void>> incrementingJobs = Stream.builder();
        for (final Entry<String, Integer> entry : input.entrySet()) {
            final String className = entry.getKey();
            final Integer oldValue = entry.getValue();
            final CompletableFuture<Void> incrementingJob = CompletableFuture.runAsync(() -> {
                result.put(className, oldValue + 1);
            }, backgroundJobExecutor);
            incrementingJobs.add(incrementingJob);
        }
        // using thenApply instead of join here:
        return CompletableFuture.allOf(
                incrementingJobs.build().toArray(
                    CompletableFuture[]::new
                )
            ).thenApply(x -> result);
    }
}
Yezd answered 12/3, 2018 at 16:30 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.