Java 21 structured concurrency, need predictable subtask exception ordering
B

3

6

I'm rather new to parallel code, and I tried to convert some code based on executors to structured concurrency, but I lost an important property that I must somehow keep.

Given the following code using structured concurrency with Java 21 preview:

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Subtask<Data1> d1Subtask = scope.fork(() -> getData1(input));
    Subtask<Data2> d2Subtask = scope.fork(() -> getData2(input));

    scope.join().throwIfFailed(); // [1]

    var data1 = d1Subtask.get(); // [2]
    var data2 = d2Subtask.get();

    return new Response(data1, data2);
}

In [1] an eventual first exception out of the two subtasks is thrown, and I don't want that. I need to run both tasks in parallel but I need the result of d1Subtask first in case it fails. In other words:

  • if d1Subtask fails, I need to throw its exception (d2Subtask might be running, be successful or failed and none of it matters, exceptions from d1Subtask make the second task irrelevant);
  • if d1Subtask succeeds and d2Subtask fail, I need the exception from d2Subtask;
  • if both succeed, combine the results of both.

If I change it to scope.join(); then [2] can fail if d1Subtask is not done. There is d1Subtask.state() but waiting for it to leave the State.UNAVAILABLE state seems against the idea of structured concurrency.

This can be achieved with Executors and pure StructuredTaskScope, but that means potentially running d2Subtask to completion even when the scope could be shut down and that task aborted.

Given that, is possible to modify the code above to wait for the result of d1Subtask in a clean, readable way? I imagined that something like scope.join(d1Subtask) or d1Subtask.join() would be the way of doing it, or maybe a different policy, if that API existed.


Edit: clearer explanation of the desired logic with each possible outcome.

Blackett answered 28/2 at 9:6 Comment(4)
If you don’t want the behavior of the ShutdownOnFailure policy, don’t use ShutdownOnFailure.Alroy
@Alroy Yes, this is the expected behavior. Since the only other built-in policy isShutdownOnSuccess (which is inadequate for me), what are my alternatives? Use ShutdownOnFailure and check states, implement my own policy or something else I missed?Blackett
You don’t have to decide between ShutdownOnSuccess and ShutdownOnFailure. You can simply use new StructuredTaskScope<>() without a special policy, then join() will always wait for all tasks so you can easily check whether d1Subtask failed before checking whether d2Subtask failed. You can also create a subclass implementing your own policy.Alroy
I actually haven't realized that... I always see samples with ShutdownOn so I didn't think that StructuredTaskScope was in itself useable. Kind of obvious in retrospect, after all the most basic behavior is "run all tasks to completion". Thanks.Blackett
A
10

You can use StructuredTaskScope directly, without ShutdownOnFailure, to wait for all jobs to complete, then, you can check the results and failures in the intended order, e.g.

static Response simpleApproach() throws ExecutionException, InterruptedException {
    try(var scope = new StructuredTaskScope<>()) {
        Subtask<Data1> d1Subtask = scope.fork(() -> getData1(input));
        Subtask<Data2> d2Subtask = scope.fork(() -> getData2(input));

        scope.join();

        var data1 = get(d1Subtask);
        var data2 = get(d2Subtask);

        return new Response(data1, data2);
    }
}

static <T> T get(Subtask<T> task) throws ExecutionException {
    if(task.state() == State.FAILED)
        throw new ExecutionException(task.exception());
    return task.get();
}

This is the simplest approach. It ensures that if both jobs failed, the exception of “data1” is propagated to the caller. The only disadvantage is that if “data1” failed before “data2”’s completion, it will wait for “data2”, without an attempt to interrupt it. This, however, may be acceptable as we’re usually not trying (too hard) to optimize the exceptional case.


But you can also implement your own policy. Here’s an example of a policy having a “primary job”. When other jobs failed, it will wait for the primary job’s completion, to prefer its exception if it failed too. But when the primary job failed, it will shut down immediately, trying to interrupt all other jobs and not wait for their completion:

static Response customPolicy() throws ExecutionException, InterruptedException {
    try(var scope = new ShutdownOnPrimaryFailure<>()) {
        Subtask<Data1> d1Subtask = scope.forkPrimary(() -> getData1(input));
        Subtask<Data2> d2Subtask = scope.fork(() -> getData2(input));

        scope.join().throwIfFailed();

        var data1 = d1Subtask.get();
        var data2 = d2Subtask.get();

        return new Response(data1, data2);
    }
}
class ShutdownOnPrimaryFailure<T> extends StructuredTaskScope<T> {
    private final AtomicReference<Throwable> failure = new AtomicReference<>();
    private Subtask<?> primary;

    public <U extends T> Subtask<U> forkPrimary(Callable<? extends U> task) {
        ensureOwnerAndJoined();
        Subtask<U> forked = super.fork(task);
        primary = forked;
        return forked;
    }

    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
        super.handleComplete(subtask);
        if(subtask.state() == State.FAILED) {
            if(subtask == primary) {
                failure.set(subtask.exception());
                shutdown();
            }
            else failure.compareAndSet(null, subtask.exception());
        }
    }

    @Override
    public ShutdownOnPrimaryFailure<T> join() throws InterruptedException {
        super.join();
        primary = null;
        return this;
    }

    @Override
    public ShutdownOnPrimaryFailure<T> joinUntil(Instant deadline)
        throws InterruptedException, TimeoutException {

        super.joinUntil(deadline);
        primary = null;
        return this;
    }

    public void throwIfFailed() throws ExecutionException {
        ensureOwnerAndJoined();
        Throwable t = failure.get();
        if(t != null) throw new ExecutionException(t);
    }
}

For completeness, I provide code for testing all scenarios at the end of this answer. It checks for all combinations of success and failures.

With the implemented approaches, it will print

  *** Original
D1 ↓  D2 →  SUCCESS      D1 D2   FAIL_FAST    D1 D2   FAIL_SLOW    D1 D2
SUCCESS:    Success       F  F   Data2 Fail    F  F   Data2 Fail    F  F
FAIL_FAST:  Data1 Fail    F  F   -             F  F   Data1 Fail    F  I
FAIL_SLOW:  Data1 Fail    F  F   Data2 Fail    I  F   -             I  F

  *** Simple
D1 ↓  D2 →  SUCCESS      D1 D2   FAIL_FAST    D1 D2   FAIL_SLOW    D1 D2
SUCCESS:    Success       F  F   Data2 Fail    F  F   Data2 Fail    F  F
FAIL_FAST:  Data1 Fail    F  F   -             F  F   Data1 Fail    F  F
FAIL_SLOW:  Data1 Fail    F  F   Data1 Fail    F  F   -             F  F

  *** Custom Policy
D1 ↓  D2 →  SUCCESS      D1 D2   FAIL_FAST    D1 D2   FAIL_SLOW    D1 D2
SUCCESS:    Success       F  F   Data2 Fail    F  F   Data2 Fail    F  F
FAIL_FAST:  Data1 Fail    F  F   -             F  F   Data1 Fail    F  I
FAIL_SLOW:  Data1 Fail    F  F   Data1 Fail    F  F   -             F  F

Abbrev. status: Finished, Interrupted, or Running

The issue was the scenario of D1 failing slow and D2 failing fast, in the middle of the 3rd line. The ShutdownOnFailure then aborted D1 (D1 status Interrupted) and propagated D2’s failure. The simple approach clearly fixes it but loses the ability to fail fast when D1 failed fast (the last scenario in the 2nd line, D2 status now Finished). The custom policy solves the original issue while retaining the fail-fast support.

public class StructuredExample {
    public static void main(String[] args) {
        record Approach(String name, Callable<?> method) {}
        List<Approach> approaches = List.of(
            new Approach("Original", StructuredExample::originalApproach),
            new Approach("Simple", StructuredExample::simpleApproach),
            new Approach("Custom Policy", StructuredExample::customPolicy));

        for(var approach: approaches) {
            System.out.println("  *** " + approach.name());
            System.out.printf("%-12s", "D1 \u2193  D2 \u2192");
            for(Mode d2Mode: Mode.values()) System.out.printf("%-12s D1 D2   ", d2Mode);
            System.out.println();
            for(Mode d1Mode: Mode.values()) {
                System.out.printf("%-12s", d1Mode + ":");
                for(Mode d2Mode: Mode.values()) {
                    String result = "-";
                    if(d2Mode == Mode.SUCCESS || d1Mode != d2Mode) try {
                        ScopedValue.where(data1Mode, d1Mode)
                            .where(data2Mode, d2Mode)
                            .call(() -> approach.method().call());
                        result = "Success";
                    }
                    catch(ExecutionException ex) { result = ex.getCause().getMessage(); }
                    catch(Exception ex) { result = ex.getMessage(); }
                    System.out.printf("%-12s%3s%3s   ", result, d1Running.name().charAt(0), d2Running.name().charAt(0));
                }
                System.out.println();
            }
            System.out.println();
        }
    }

    // mock for the getData1 and getData2 operations, producing success or failure and recording running state

    enum Mode { SUCCESS, FAIL_FAST, FAIL_SLOW }
    enum StateDebug { RUNNING, FINISHED, INTERRUPTED; }

    static final ScopedValue<Mode> data1Mode = ScopedValue.newInstance();
    static final ScopedValue<Mode> data2Mode = ScopedValue.newInstance();

    static volatile StateDebug d1Running, d2Running;

    static Data1 getData1(Object input) throws Exception {
        return getDataImpl("Data1", data1Mode, Data1::new, s -> d1Running = s);
    }

    static Data2 getData2(Object input) throws Exception {
        return getDataImpl("Data2", data2Mode, Data2::new, s -> d2Running = s);
    }

    static <T> T getDataImpl(String which, ScopedValue<Mode> mode, Supplier<T> s, Consumer<StateDebug> c) throws Exception {
        c.accept(StateDebug.RUNNING);
        boolean interrupted = false;
        try {
            Thread.sleep(500);
            switch(mode.get()) {
                case SUCCESS: return s.get();
                case FAIL_SLOW: Thread.sleep(500);
            }
            throw new Exception(which + " Fail");
        }
        catch(InterruptedException ex) {
            interrupted = true;
            c.accept(StateDebug.INTERRUPTED);
            throw ex;
        }
        finally {
            if(!interrupted) c.accept(StateDebug.FINISHED);
        }
    }

    // dummy data and types

    record Data1() {}
    record Data2() {}

    record Response(Data1 data1, Data2 data2)  {}

    static Object input;

    // the implementations

    static Response originalApproach() throws ExecutionException, InterruptedException {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Subtask<Data1> d1Subtask = scope.fork(() -> getData1(input));
            Subtask<Data2> d2Subtask = scope.fork(() -> getData2(input));
    
            scope.join().throwIfFailed(); // [1]
    
            var data1 = d1Subtask.get(); // [2]
            var data2 = d2Subtask.get();
    
            return new Response(data1, data2);
        }
    }

    static Response simpleApproach() throws ExecutionException, InterruptedException {
        try(var scope = new StructuredTaskScope<>()) {
            Subtask<Data1> d1Subtask = scope.fork(() -> getData1(input));
            Subtask<Data2> d2Subtask = scope.fork(() -> getData2(input));

            scope.join();

            var data1 = get(d1Subtask);
            var data2 = get(d2Subtask);

            return new Response(data1, data2);
        }
    }

    static <T> T get(Subtask<T> task) throws ExecutionException {
        if(task.state() == State.FAILED)
            throw new ExecutionException(task.exception());
        return task.get();
    }

    static Response customPolicy() throws ExecutionException, InterruptedException {
        try(var scope = new ShutdownOnPrimaryFailure<>()) {
            Subtask<Data1> d1Subtask = scope.forkPrimary(() -> getData1(input));
            Subtask<Data2> d2Subtask = scope.fork(() -> getData2(input));

            scope.join().throwIfFailed();

            var data1 = d1Subtask.get();
            var data2 = d2Subtask.get();

            return new Response(data1, data2);
        }
    }
}

class ShutdownOnPrimaryFailure<T> extends StructuredTaskScope<T> {
    private final AtomicReference<Throwable> failure = new AtomicReference<>();
    private Subtask<?> primary;

    public <U extends T> Subtask<U> forkPrimary(Callable<? extends U> task) {
        ensureOwnerAndJoined();
        Subtask<U> forked = super.fork(task);
        primary = forked;
        return forked;
    }

    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
        super.handleComplete(subtask);
        if(subtask.state() == State.FAILED) {
            if(subtask == primary) {
                failure.set(subtask.exception());
                shutdown();
            }
            else failure.compareAndSet(null, subtask.exception());
        }
    }

    @Override
    public ShutdownOnPrimaryFailure<T> join() throws InterruptedException {
        super.join();
        primary = null;
        return this;
    }

    @Override
    public ShutdownOnPrimaryFailure<T> joinUntil(Instant deadline) throws InterruptedException, TimeoutException {
        super.joinUntil(deadline);
        primary = null;
        return this;
    }

    public void throwIfFailed() throws ExecutionException {
        ensureOwnerAndJoined();
        Throwable t = failure.get();
        if(t != null) throw new ExecutionException(t);
    }
}
Alroy answered 29/2 at 10:55 Comment(3)
Impressive answer! Sorry for the confusion in my initial description, but you got my scenario right here. A custom scope definitely looks like the way for complex cases. I also posted another answer here with nested scopes after taking a look at your code.Blackett
Perfect implementation of idea of subclassing to define custom on-fail/on-success behavior! 1) Is check on failed status necessary in get(Subtask<T> task) method? Does not throwifFailed guarantee that the point of calling get(Subtask<T> task) can only be reached when both tasks completed successfully? 2) Is ordering of forkPrimary in front of other forks's, achieved by ensureOwnerAndJoined call in forkPrimary method, necessary? Does not logic, implemented in handleComplete override, correctly handle the cases when forkPrimary follows other forks, not visa versa?Duress
@Duress 1) If you are referring to the simpleApproach(), it does not use throwIfFailed() because this method does not exist on the base class StructuredTaskScope. I added the get(Subtask<T> task) method specifically to have a method behaving like Future.get while SubTask behaves differently. 2) It’s not strictly necessary for a single round but the design of the class allows multiple rounds, i.e. after join(), you can start a new round with one primary task and an arbitrary number of other subtasks. I wanted to be close to the other policies which also support multiple rounds.Alroy
D
1

What ShutdownOnFailure subclass of StructuredTaskScope eseentially does it attempts to stop the other running SubTasks (or prevent them from running if they haven't yet started) if at least one of the SubTasks threw an exception, i.e failed. If, like the question specifies

d2Subtask might be running, be successful or failed and none of it matters,

and there is no need to stop/interrupt d2Subtask, then the usage of ShutdownOnFailure is not justified. Instead, the selective exception analysis, requested by the question

if d1Subtask fails ... throw its exception; if d2Subtask fails, throw the exception from d2Subtask

can be achieved by the usage of base StructuredTaskScope and a subsequent to join analysis of both statuses and exceptions, if any.

try (var scope = new StructuredTaskScope<>()) {
    Subtask<Data1> d1Subtask = scope.fork(() -> getData1(input1)); 
    Subtask<Data2> d2Subtask = scope.fork(() -> getData2(input2));
    scope.join();
        
    if (d1Subtask.state() == State.FAILED) {
        throw d1Subtask.exception();
    }
    if (d2Subtask.state() == State.FAILED) {
        throw d2Subtask.exception();
    }
    if (d1Subtask.state() != State.SUCCESS || d2Subtask.state() != State.SUCCESS) {
        throw new InternalError();
    }
        
    var data1 = d1Subtask.get(); // [2]
    var data2 = d2Subtask.get();

    return new Response(data1, data2);
}

Note that with the above code the lines

var data1 = d1Subtask.get(); // [2]
var data2 = d2Subtask.get();

are guaranteed to execute without exception.

There is an important difference between this and the accepted solution: the former does not shutdown the scope and allows the d2Subtask to complete even if d1Subtask fails while the latter in this case shuts down the scope (which only means that the scope will attempt to stop/interrupt the d2Subtask thread and give up if the thread is irresponsive). The OP didn't specify if termination of d2Subtask in a case of failure of d1Subtask is required, just said: "none of it matters". If this means that d2Subtask can be altogether left alone, then the above solution completely satisfies the OP's requirements; if this means that d2Subtask must be immediately terminated, then only accepted answer, which overrides handleComplete, can be satisfactory.

Duress answered 28/2 at 16:18 Comment(4)
The contract of join() is “Wait for all subtasks started in this task scope to finish or the task scope to shut down.” So when using ShutdownOnFailure which will shut down on the first failure, it is not guaranteed that all subtasks finished. You can avoid this by using a direct instance of StructuredTaskScope instead of the specialized subclasses but then, there is no throwIfFailed() method.Alroy
@Holger, edited correspondingly. Please see if this coincides with your vision.Duress
The program logic is correct but might not fit the OP’s requirement. When throwIfFailed() throws, it might be an exception thrown by d2Subtask while d1Subtask has not completed. As far as I understood, the OP wants to wait for the completion of d1Subtask in that case, to check whether it failed too to ensure to throw d1Subtask’s exception when both failed.Alroy
@Holger, yes, you are right. When the OP recently clarified what exactly he wants to achieve, the solution seems to be even easier - I updated accordingly.Duress
B
0

Thanks to the feedback from @Holger, I thought of another approach using nested scopes, which I include here for completeness sake:

static Response nestedScopes() throws ExecutionException, InterruptedException {
    try(var scope1 = new StructuredTaskScope.ShutdownOnFailure()) {
        Subtask<Data1> d1Subtask = scope1.fork(() -> getData1(input));

        try(var scope2 = new StructuredTaskScope.ShutdownOnFailure()) {
            Subtask<Data2> d2Subtask = scope2.fork(() -> getData2(input));

            scope1.join().throwIfFailed();
            scope2.join().throwIfFailed();

            var data1 = d1Subtask.get();
            var data2 = d2Subtask.get();

            return new Response(data1, data2);
        }
    }
}

Not sure if this is an advisable approach. The result is the same as @Holger's custom scope; it's less code, but probably would become hard to follow as complexity, subtasks and relationship grows, and in that case a custom scope is probably still a better approach.

Blackett answered 29/2 at 14:30 Comment(5)
That seems to work, but isn't equivalent to @Holger's simple approach? This seems to be an unnecessary complication. ShutdownOnFailure will shutdown other tasks if one fails, but what is the usage of it if there is only one task in a scope?Duress
@Duress No, his Simple approach will cause an exception from d2Subtask to be thrown if that finishes first. My approach above should be equivalent to his Custom Policy, which always prioritizes the result of d1Subtask first even if d2Subtask fails first, and that matches my original requirement.Blackett
Honestly, don't see how his simple approach will cause an exception from d2Subtask to be thrown if that finishes first - if it checks d1Subtask first. But @Alroy knows better his approaches. No, it does not look to me that your approach is equivalent to his custom one as yours does not attempt to shutdown the second subtask if first fails - the main point of thus custom thing. This is discussed in my answer..Duress
My bad. His Simple approach will be equivalent because of the get(Subtask) method that I missed... yet Simple is still slightly less optimized as it waits for the completion of d2Subtask regardless of the first task, as exposed by @Holger. This caveat is addressed in his Custom Policy and should be in my code above too, if I tested properly. And about not attempting to shutdown, it actually does: remember that scope objects are AutoCloseable. This is in the docs of close: This method first shuts down the task scope (as if by invoking the shutdown method).Blackett
Agree with your analysis of Simple vs Custom. As for shutdown, close method does not do anything in all approaches including all three @Holger's, yours and mine (other than switching scope's state to CLOSED), because all threads are finished at the time this method is called. In all 5 cases the entire scope will be eventually shut down anyway, but Custom shuts down d2Subtask as early as possible - when d1Subtask fails.Duress

© 2022 - 2024 — McMap. All rights reserved.