Constructing a DAG of cancelable Java tasks
Asked Answered
O

1

0

I want to create a DAG out of tasks in Java, where the tasks may depend upon the output of other tasks. If there is no directed path between two tasks, they may run in parallel. Tasks may be canceled. If any task throws an exception, all tasks are canceled.

I wanted to use CompleteableFuture for this, but despite implementing the Future interface (including Future.cancel(boolean), CompletableFuture does not support cancelation -- CompletableFuture.cancel(true) is simply ignored. (Does anybody know why?)

Therefore, I am resorting to building my own DAG of tasks using Future. It's a lot of boilerplate, and complicated to get right. Is there any better method than this?

Here is an example:

  1. I want to call Process process = Runtime.getRuntime().exec(cmd) to start a commandline process, creating a Future<Process>. Then I want to launch (fan out to) three subtasks:
    • One task that consumes input from process.getInputStream()
    • One task that consumes input from process.getErrorStream()
    • One task that calls process.waitFor(), and then waits for the result.
  2. Then I want to wait for all three of the launched sub-tasks to complete (i.e. fan-in / a completion barrier). This should be collected in a final Future<Integer> exitCode that collects the exit code returned by the process.waitFor() task. The two input consumer tasks simply return Void, so their output can be ignored, but the completion barrier should still wait for their completion.

I want a failure in any of the launched subtasks to cause all subtasks to be canceled, and the underlying process destroyed.

Note that Process process = Runtime.getRuntime().exec(cmd) in the first step can throw an exception, which should cause the failure to cascade all the way to exitCode.

@FunctionalInterface
public static interface ConsumerThrowingIOException<T> {
    public void accept(T val) throws IOException;
}

public static Future<Integer> exec(
        ConsumerThrowingIOException<InputStream> stdoutConsumer,
        ConsumerThrowingIOException<InputStream> stderrConsumer,
        String... cmd) {

    Future<Process> processFuture = executor.submit(
            () -> Runtime.getRuntime().exec(cmd));

    AtomicReference<Future<Void>> stdoutProcessorFuture = //
            new AtomicReference<>();
    AtomicReference<Future<Void>> stderrProcessorFuture = //
            new AtomicReference<>();
    AtomicReference<Future<Integer>> exitCodeFuture = //
            new AtomicReference<>();

    Runnable cancel = () -> {
        try {
            processFuture.get().destroy();
        } catch (Exception e) {
            // Ignore (exitCodeFuture.get() will still detect exceptions)
        }
        if (stdoutProcessorFuture.get() != null) {
            stdoutProcessorFuture.get().cancel(true);
        }
        if (stderrProcessorFuture.get() != null) {
            stderrProcessorFuture.get().cancel(true);
        }
        if (exitCodeFuture.get() != null) {
            stderrProcessorFuture.get().cancel(true);
        }
    };

    if (stdoutConsumer != null) {
        stdoutProcessorFuture.set(executor.submit(() -> {
            try {
                InputStream inputStream = processFuture.get()
                        .getInputStream();
                stdoutConsumer.accept(inputStream != null
                        ? inputStream
                        : new ByteArrayInputStream(new byte[0]));
                return null;
            } catch (Exception e) {
                cancel.run();
                throw e;
            }
        }));
    }

    if (stderrConsumer != null) {
        stderrProcessorFuture.set(executor.submit(() -> {
            try {
                InputStream errorStream = processFuture.get()
                        .getErrorStream();
                stderrConsumer.accept(errorStream != null
                        ? errorStream
                        : new ByteArrayInputStream(new byte[0]));
                return null;
            } catch (Exception e) {
                cancel.run();
                throw e;
            }
        }));
    }

    exitCodeFuture.set(executor.submit(() -> {
        try {
            return processFuture.get().waitFor();
        } catch (Exception e) {
            cancel.run();
            throw e;
        }
    }));

    // Async completion barrier -- wait for process to exit,
    // and for output processors to complete
    return executor.submit(() -> {
        Exception exception = null;
        int exitCode = 1;
        try {
            exitCode = exitCodeFuture.get().get();
        } catch (InterruptedException | CancellationException
                | ExecutionException e) {
            cancel.run();
            exception = e;
        }
        if (stderrProcessorFuture.get() != null) {
            try {
                stderrProcessorFuture.get().get();
            } catch (InterruptedException | CancellationException
                    | ExecutionException e) {
                cancel.run();
                if (exception == null) {
                    exception = e;
                } else if (e instanceof ExecutionException) {
                    exception.addSuppressed(e);
                }
            }
        }
        if (stdoutProcessorFuture.get() != null) {
            try {
                stdoutProcessorFuture.get().get();
            } catch (InterruptedException | CancellationException
                    | ExecutionException e) {
                cancel.run();
                if (exception == null) {
                    exception = e;
                } else if (e instanceof ExecutionException) {
                    exception.addSuppressed(e);
                }
            }
        }
        if (exception != null) {
            throw exception;
        } else {
            return exitCode;
        }
    });
}

Note: I realize that Runtime.getRuntime().exec(cmd) should be non-blocking, so doesn't require its own Future, but I wrote the code using one anyway, to make the point about DAG construction.

Ootid answered 30/11, 2020 at 12:29 Comment(4)
CompletableFuture does not support cancelation, unless you shutdown the underlying pool and your threads react to interruptions. And fan-out is just a flatMap?Sizing
@Sizing then what does CompletableFuture.cancel(true) do? Yes, of course, the threads need to react to interruption to make this work, but for the example I gave, all three sub-tasks are blocking, therefore would need to catch InterruptedException internally anyway, and rethrow it as a CompletionException. And I don't think fan-out is just a flatMap, because for fan-out to work, the three sub-tasks need to be able to be launched from within the parent task -- or on the completion of the parent task -- which adds complexity.Ootid
cancel will not interrupt, read the documentation. It literally says thatSizing
@Sizing I updated the question to use Future rather than CompletableFuture -- thanks for the heads-up.Ootid
V
-1

No way. Process has no asynchronous interface (except for Process.onExit()). So you have to use threads to wait for process creation and while reading from InputStreams. Other components of your DAG can be async tasks (CompletableFutures).

This is not a big problem. The only advantage of async tasks over threads is less memory consumption. Your Process consumes a lot if memery anyway, so there is not much sense to save memory here.

Varini answered 30/11, 2020 at 14:1 Comment(3)
I think you missed the whole point of my question. Process launches another thread that runs in parallel, although waitFor() will block on that thread's exit. The whole reason I'm trying to use CompletableFuture is to make the exit code from waitFor(), and stdout/stderr from the process, available asynchronously.Ootid
stdout/stderr have only blocking interface, they are not available asynchronously. Of course, you always can convert synchronous interface to asynchronoud (but not vice versa), but it would be pure waste of resources.Varini
waitFor already returns exit code, no intermediate CompletableFuture required.Varini

© 2022 - 2024 — McMap. All rights reserved.