I want to create a DAG out of tasks in Java, where the tasks may depend upon the output of other tasks. If there is no directed path between two tasks, they may run in parallel. Tasks may be canceled. If any task throws an exception, all tasks are canceled.
I wanted to use CompleteableFuture
for this, but despite implementing the Future
interface (including Future.cancel(boolean)
, CompletableFuture
does not support cancelation -- CompletableFuture.cancel(true)
is simply ignored. (Does anybody know why?)
Therefore, I am resorting to building my own DAG of tasks using Future
. It's a lot of boilerplate, and complicated to get right. Is there any better method than this?
Here is an example:
- I want to call
Process process = Runtime.getRuntime().exec(cmd)
to start a commandline process, creating aFuture<Process>
. Then I want to launch (fan out to) three subtasks:- One task that consumes input from
process.getInputStream()
- One task that consumes input from
process.getErrorStream()
- One task that calls
process.waitFor()
, and then waits for the result.
- One task that consumes input from
- Then I want to wait for all three of the launched sub-tasks to complete (i.e. fan-in / a completion barrier). This should be collected in a final
Future<Integer> exitCode
that collects the exit code returned by theprocess.waitFor()
task. The two input consumer tasks simply returnVoid
, so their output can be ignored, but the completion barrier should still wait for their completion.
I want a failure in any of the launched subtasks to cause all subtasks to be canceled, and the underlying process destroyed.
Note that Process process = Runtime.getRuntime().exec(cmd)
in the first step can throw an exception, which should cause the failure to cascade all the way to exitCode
.
@FunctionalInterface
public static interface ConsumerThrowingIOException<T> {
public void accept(T val) throws IOException;
}
public static Future<Integer> exec(
ConsumerThrowingIOException<InputStream> stdoutConsumer,
ConsumerThrowingIOException<InputStream> stderrConsumer,
String... cmd) {
Future<Process> processFuture = executor.submit(
() -> Runtime.getRuntime().exec(cmd));
AtomicReference<Future<Void>> stdoutProcessorFuture = //
new AtomicReference<>();
AtomicReference<Future<Void>> stderrProcessorFuture = //
new AtomicReference<>();
AtomicReference<Future<Integer>> exitCodeFuture = //
new AtomicReference<>();
Runnable cancel = () -> {
try {
processFuture.get().destroy();
} catch (Exception e) {
// Ignore (exitCodeFuture.get() will still detect exceptions)
}
if (stdoutProcessorFuture.get() != null) {
stdoutProcessorFuture.get().cancel(true);
}
if (stderrProcessorFuture.get() != null) {
stderrProcessorFuture.get().cancel(true);
}
if (exitCodeFuture.get() != null) {
stderrProcessorFuture.get().cancel(true);
}
};
if (stdoutConsumer != null) {
stdoutProcessorFuture.set(executor.submit(() -> {
try {
InputStream inputStream = processFuture.get()
.getInputStream();
stdoutConsumer.accept(inputStream != null
? inputStream
: new ByteArrayInputStream(new byte[0]));
return null;
} catch (Exception e) {
cancel.run();
throw e;
}
}));
}
if (stderrConsumer != null) {
stderrProcessorFuture.set(executor.submit(() -> {
try {
InputStream errorStream = processFuture.get()
.getErrorStream();
stderrConsumer.accept(errorStream != null
? errorStream
: new ByteArrayInputStream(new byte[0]));
return null;
} catch (Exception e) {
cancel.run();
throw e;
}
}));
}
exitCodeFuture.set(executor.submit(() -> {
try {
return processFuture.get().waitFor();
} catch (Exception e) {
cancel.run();
throw e;
}
}));
// Async completion barrier -- wait for process to exit,
// and for output processors to complete
return executor.submit(() -> {
Exception exception = null;
int exitCode = 1;
try {
exitCode = exitCodeFuture.get().get();
} catch (InterruptedException | CancellationException
| ExecutionException e) {
cancel.run();
exception = e;
}
if (stderrProcessorFuture.get() != null) {
try {
stderrProcessorFuture.get().get();
} catch (InterruptedException | CancellationException
| ExecutionException e) {
cancel.run();
if (exception == null) {
exception = e;
} else if (e instanceof ExecutionException) {
exception.addSuppressed(e);
}
}
}
if (stdoutProcessorFuture.get() != null) {
try {
stdoutProcessorFuture.get().get();
} catch (InterruptedException | CancellationException
| ExecutionException e) {
cancel.run();
if (exception == null) {
exception = e;
} else if (e instanceof ExecutionException) {
exception.addSuppressed(e);
}
}
}
if (exception != null) {
throw exception;
} else {
return exitCode;
}
});
}
Note: I realize that Runtime.getRuntime().exec(cmd)
should be non-blocking, so doesn't require its own Future
, but I wrote the code using one anyway, to make the point about DAG construction.
flatMap
? – SizingCompletableFuture.cancel(true)
do? Yes, of course, the threads need to react to interruption to make this work, but for the example I gave, all three sub-tasks are blocking, therefore would need to catchInterruptedException
internally anyway, and rethrow it as aCompletionException
. And I don't think fan-out is just aflatMap
, because for fan-out to work, the three sub-tasks need to be able to be launched from within the parent task -- or on the completion of the parent task -- which adds complexity. – OotidFuture
rather thanCompletableFuture
-- thanks for the heads-up. – Ootid