How to use ExecutorService to poll until a result arrives
Asked Answered
M

3

20

I have a scenario where I have to poll a remote server checking if a task has completed. Once it has, I make a different call to retrieve the result.

I originally figured I should use a SingleThreadScheduledExecutor with scheduleWithFixedDelay for polling:

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId), 0, 10, TimeUnit.SECONDS);

public void poll(String jobId) {
   boolean jobDone = remoteServer.isJobDone(jobId);
   if (jobDone) {
       retrieveJobResult(jobId);
   }
}

But since I can only provide a Runnable to scheduleWithFixedDelay which can't return anything, I don't understand when the future will be complete, if ever. What does calling future.get() even mean? What result am I waiting for?

The first time I detect the remote task has completed, I want to execute a different remote call and set its result as the value of the future. I figured I could use CompletableFuture for this, that I would forward to my poll method, which would in turn forward it to my retrieveTask method that would eventually complete it:

CompletableFuture<Object> result = new CompletableFuture<Object>();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId, result), 0, 10, TimeUnit.SECONDS);

public void poll(String jobId, CompletableFuture<Object> result) {
   boolean jobDone = remoteServer.isJobDone(jobId);
   if (jobDone) {
       retrieveJobResult(jobId, result);
   }
}

public void retrieveJobResult(String jobId, CompletableFuture<Object> result) {
    Object remoteResult = remoteServer.getJobResult(jobId);
    result.complete(remoteResult);
}

But this has a ton of issues. For one, CompletableFuture doesn't even seem to be intended for this kind of use. Instead I should be doing CompletableFuture.supplyAsync(() -> poll(jobId)) I think, but how would I then properly shutdown the executor and cancel the future it returned when my CompletableFuture is canceled/complete? It feels like polling should be implemented in some completely different way.

Metametabel answered 25/10, 2016 at 23:48 Comment(7)
You can also submit Callables (that return results): docs.oracle.com/javase/7/docs/api/java/util/concurrent/…Bloodworth
@Bloodworth Only for one-off tasks, not with scheduleWithFixedDelay or scheduleAtFixedRate, so polling is outMetametabel
@Bloodworth i don't think scheduleWithFixedDelay ever receive a Callable.Gregggreggory
Op, i think you are doing the right thing. CompletableFuture is in fact a promise in an async programming framework. what you should expose, however, a normal future that is not completable. and all your subsequent code should subscribe to that future. i don't see any problem. what confuses you?Gregggreggory
@HuStmpHrrr All examples seem to be doing supplyAsync instead of explicitly creating a CompletableFuture. But more importantly, in my case I need to shutdown the executor when the future is complete. Should I subclass CompletableFuture and override complete, completeExceptionally and cancel to do this? Should I worry about canceling the ScheduledFuture I got from executor?Metametabel
@Metametabel you should cancel the scheduledFuture. let me answer.Gregggreggory
@kaqqao, if you don't get an answer, you can have a look at JBJF Daemons Framework on SourceForge, part of the JBJF Project. I wrote the Daemons Framework more as a pure Daemon/Service though. It doesn’t use ExecutorService, but instead, just straight Runnable/Thread for the Producers and Consumers. But you may be able to fit it to your needs…A Producer will implement the polling. But you’ll need to code it a little differently to turn around and fetch your result from the completed task.Zelig
G
19

I think CompletableFutures are a fine way to do this:

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

private void run() {
    final Object jobResult = pollForCompletion("jobId1")
            .thenApply(jobId -> remoteServer.getJobResult(jobId))
            .get();

}

private CompletableFuture<String> pollForCompletion(final String jobId) {
    CompletableFuture<String> completionFuture = new CompletableFuture<>();
    final ScheduledFuture<Void> checkFuture = executor.scheduleAtFixedRate(() -> {
        if (remoteServer.isJobDone(jobId)) {
            completionFuture.complete(jobId);
        }
    }, 0, 10, TimeUnit.SECONDS);
    completionFuture.whenComplete((result, thrown) -> {
        checkFuture.cancel(true);
    });
    return completionFuture;
}
Gravamen answered 26/10, 2016 at 0:23 Comment(6)
Thank you! I finally realize how multiple steps are supposed to be combined. I was missing this the entire time. Will go try it out now.Metametabel
What is a proper way to check inside whenComplete if the future was cancelled? I can only come up with thrown instanceof CancelationExceptionMetametabel
I don't recall off the top of my head; the CancellationException may be wrapped in an ExecutionException. (So you'd need to check thrown.getCause() instanceof CancellationException.) In my example code, however, there is nothing that would cause the pollForCompletion CompletableFuture to become canceled. Can you update your question with your new code?Gravamen
I posted my new code and my new problem as a separate question as this one I'd say you fully answered already. I'm accepting your answer as you were the first to post. Please check the new question if you're still intererested in helping me out.Metametabel
The usage of whenComplete seems incorrect because the completable future might throw an exception or timeout etc. Therefore handle seems like a more appropriate method to cancel the schedulerAlbrecht
@GeorgePligoropoulos I believe whenComplete works ok in this case. The whenComplete action is executed whether the original stage ends with a result or with an exception, and exceptions are re-thrown. I think the problem is that we aren't catching exceptions inside the ScheduledFuture job, in order to invoke completeExceptionally with the caught exception.Cantu
I
3

I created a generic utility for this inspired by this answer using Supplier<Optional<T>> whereby each poll can return Optional.empty() until the value is ready. I also implemented a timeout so that a TimeoutException is thrown if the max time is exceeded.

Usage:

ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
Supplier<Optional<String>> supplier = () -> remoteServer.isJobDone(jobId) ? Optional.of(jobId) : Optional.empty();
CompletableFuture<String> future = ScheduledCompletableFuture.builder(String.class)
   .supplier(supplier)
   .executorService(scheduledExecutor)
   .timeUnit(TimeUnit.SECONDS)
   .initialDelay(5)
   .period(5)
   .timeout(60 * 5)
   .build();

ScheduledCompletableFuture.java

public class ScheduledCompletableFuture {
    public static class ScheduledCompletableFutureBuilder<T> {
        private Supplier<Optional<T>> supplier;
        private ScheduledExecutorService executorService;
        private Long initialDelay;
        private Long period;
        private Long timeout;
        private TimeUnit timeUnit;

        public ScheduledCompletableFutureBuilder() {
        }

        public ScheduledCompletableFutureBuilder<T> supplier(Supplier<Optional<T>> supplier) {
            this.supplier = supplier;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> executorService(ScheduledExecutorService executorService) {
            this.executorService = executorService;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> initialDelay(long initialDelay) {
            this.initialDelay = initialDelay;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> period(long period) {
            this.period = period;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> timeout(long timeout) {
            this.timeout = timeout;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> timeUnit(TimeUnit timeUnit) {
            this.timeUnit = timeUnit;
            return this;
        }

        public CompletableFuture<T> build() {
            // take a copy of instance variables so that the Builder can be re-used
            Supplier<Optional<T>> supplier = this.supplier;
            ScheduledExecutorService executorService = this.executorService;
            Long initialDelay = this.initialDelay;
            Long period = this.period;
            Long timeout = this.timeout;
            TimeUnit timeUnit = this.timeUnit;

            CompletableFuture<T> completableFuture = new CompletableFuture<>();
            long endMillis = System.currentTimeMillis() + timeUnit.toMillis(timeout);
            Runnable command = () -> {
                Optional<T> optional = supplier.get();
                if (optional.isPresent()) {
                    completableFuture.complete(optional.get());
                } else if (System.currentTimeMillis() > endMillis) {
                    String msg = String.format("Supplier did not return a value within %s %s", timeout, timeUnit);
                    completableFuture.completeExceptionally(new TimeoutException(msg));
                }
            };
            ScheduledFuture<?> scheduledFuture = executorService.scheduleAtFixedRate(command, initialDelay, period, timeUnit);
            return completableFuture.whenComplete((result, exception) -> scheduledFuture.cancel(true));
        }
    }

    public static <T> ScheduledCompletableFutureBuilder<T> builder(Class<T> type) {
        return new ScheduledCompletableFutureBuilder<>();
    }
}
Interlocutrix answered 25/3, 2021 at 11:48 Comment(0)
G
2

it seems to me you are more worried by some stylistic problems than any others. in java 8, CompletableFuture has 2 roles: one is the traditional future, which gives an asynchronous source for task execution and status query; the other is what we usually call a promise. a promise, if you don't know yet, can be considered a builder of future and its completion source. so in this case, intuitively a promise is required, which is the exact case you are using here. the examples you are worrying about is something that introduces you the first usage, but not the promise way.

accepting this, it should be easier for you to start dealing with your actual problem. i think the promise is supposed to have 2 roles, one is to notify your task completion of polling, and the other is to cancel your scheduled task on completion. here should be the final solution:

public CompletableFuture<Object> pollTask(int jobId) {
    CompletableFuture<Object> fut = new CompletableFuture<>();
    ScheduledFuture<?> sfuture = executor.scheduleWithFixedDelay(() -> _poll(jobId, fut), 0, 10, TimeUnit.SECONDS);
    fut.thenAccept(ignore -> sfuture.cancel(false));
    return fut;
}

private void _poll(int jobId, CompletableFuture<Object> fut) {
    // whatever polls
    if (isDone) {
        fut.complete(yourResult);
    }
}
Gregggreggory answered 26/10, 2016 at 0:24 Comment(2)
Aah, I now realize how one is supposed to combine steps with Futures. That was eluding me so far. Thanks, this is useful. I'll now go try it.Metametabel
Still having issues with this code, but it's a different question from this one. Please check out the new question if you're still interested in helping me out. Thanks again.Metametabel

© 2022 - 2024 — McMap. All rights reserved.