Execute a few threads in parallel and a few in serial by CompletableFuture
Asked Answered
D

6

5

I need to perform some tasks. Some of the tasks are independent and some are dependent on successful execution of other tasks. Independent tasks can be run in parallel for better performance. I call these tasks as services. The column link tells which services will be execute in series and which in parallel. The column order describes the execution order that will be followed by a set of defined services. For below example, service A and B should run in parallel. If they have executed successfully then service C will execute. Please note service C is not directly dependent on output of its previous services but it must run after successful execution of its previous services because service C will require some data during its execution produced by its previous services. After successful execution of service C, the next service D will execute and so on this cycle will be continued until all services in the list have been consumed.

Tasks       service     link      order
Service A   01          03        1
Service B   02          03        2
Service C   03          04        3
Service D   04          05        4
Service E   05          07        5
Service F   06          07        6
Service G   07          (null)    7

Following is my code.

    public void executeTransactionFlow(DataVo dataVo) throws Exception {

    List<Callable<Boolean>> threadList = new ArrayList<>();
    List<String> serviceIds = new ArrayList<>();
    List<Future<Boolean>> futureList;
    String validatedRespCode = null, joinTo, prevJoinTo = null, serviceId;

    // Iterating through service flows map
    for (Map<String, String> map : serviceFlowsMap) {
        joinTo = map.get("link");
        serviceId = map.get("service");

        // A simple flag to differentiate which services should execute parallel and which in serial.
        if (null == prevJoinTo) {
            prevJoinTo = joinTo;
        }

        // Check for join condition. If join condition is same as previous then do not execute the thread list yet add current service in list
        if (null != joinTo && joinTo.equals(prevJoinTo)) {
            threadList.add(new Callable<String, DataVo>(serviceId, dataVo));
        }

        /*
         * 1. Run the threads in the list
         * 2. Empty the thread list
         * 3. Empty serviceIds list
         * 4. Set prevJoinTo
         */
        else {
            if (threadList.size() > 0) {
                prevJoinTo = joinTo;

                try {

                    // If list contain only 1 service then call, otherwise invokeAll
                    futureList = MyExecutor.executeServices(threadList, dataVo);

                    // During execution we cannot interrupt services, so we check here after they get back to here and interrupt if it has been timedout.
                    if (dataVo.isTimedout()) {
                        throw new Exception("Transaction thread is Interrupted or Timed-out");
                    }

                    // Validate service response codes and get decision in case of any failure
                    validatedRespCode = validateResponseOfExecutedServices(dataVo, futureList, serviceIds);

                    // If validationRespCode is non 00 then do not process further
                    if (null != validatedRespCode && !"200".equals(validatedRespCode)) {
                        break;
                    }
                }
                catch (Exception e) {
                    throw new Exception(e.getMessage(), e);
                }
                finally {
                    // clear thread list and serviceIds list. It will be populated for next parallel set of threads
                    threadList.clear();
                    serviceIds.clear();
                }
            }

            // Start preparing new thread list
            // Adding current service_id into threadList after executing previous services in parallel.
            threadList.add(new Callable<String, DataVo>(serviceId, dataVo));
        }
    }

    // Run remaining services
    if (!threadList.isEmpty()) {

        try {
            futureList = MyExecutor.executeServices(threadList, dataVo);
            validatedRespCode = validateResponseOfExecutedServices(dataVo, futureList, serviceIds);
        }
        catch (Throwable e) {
            throw new Exception(e.getMessage(), e);
        }
    }

    // Check validation response code
    if (null != validatedRespCode && !"200".equals(validatedRespCode)) {
        MyExecutor.callDeclineFlow(dataVo, validatedRespCode, null);
    }

}


/**
 * This method iterates through the thread list and checks for exceptions and service responses.
 * If service response is not success or if any exception has occurred then exception is thrown
 */
public String validateResponseOfExecutedServices(DataVo dataVo, List<Future<Boolean>> futureList, List<String> serviceIds) throws Exception {
    String finalResponse = "200", serviceResponse = null;

    /*
     * future list will be null if single service is executed (no other parallel transactions). The reason is that we do
     * not use invokeAll() on single service.
     */

    if (null != futureList && futureList.size() > 0) {
        for (Future<Boolean> future : futureList) {
            try {
                future.get();
            }
            catch (Exception e) {
                throw new Exception(e.getMessage(), e);
            }
        }
    }

    // Iterate through serviceIds and check responses.
    for (String serviceId : serviceIds) {
        serviceResponse = dataVo.getServiceResponse(serviceId);

        /*
         * if one of following response is found then consider it exception
         */
        if (null != serviceResponse && "400,401,402,403,404,500,501".contains(serviceResponse)) {
            throw new Exception("One of the service has been declined");
        }
    }

    return finalResponse;
}

If CompletableFuture can be beneficial here, then how can I use that efficiently?

And future.get() is a blocking call. In case I have 10 services that execute in parallel, then this future.get() will be blocking others even if they have executed prior to the current for which we are waiting. How to avoid this blocking?

I have added more details of the problem statement i.e the addition of order column. The services need to follow the defined order. The order of service A and B is 1 and 2 respectively but still they will execute in parallel because both have 03 value in link. I think dependency graphs based approach will not be required now as suggested by @Thomas in comments.

Derek answered 13/1, 2020 at 11:47 Comment(2)
You probably want to look into task dependency graphs, i.e. you'd define which tasks' completion another task depends on and either submit all of them and let the system work out the ordering or have the system spawn dependent tasks when appropriate (i.e. you only submit the "root" tasks).Overseer
This is not a CompletableFuture but a simple Future, to see the difference : #39472561Fiscus
C
3

Awesome question. Though, technically, it is surely possible to do this using ExecutorService and Future purely, the better way as per me will be to use reactive programming rather than depend purely on Future or CompletableFuture or CompletionService and the like. The main reason is that it may quickly become a difficult-to-read code.

Here is how I did it using RxJava 2.2.16 and ExecutorService:

  1. Execute actions that don't depend on others or all of their dependencies have been completed using ExecutorService to submit() actions.
  2. To know that an action is completed, use BehaviorSubject of RxJava. When an action is complete, trigger step (1) for each of its dependencies.
  3. Shutdown the ExecutorService when all actions are completed. For this, use another BehaviorSubject.

I am sorry, I have written the entire logic in my own way because of the new approach. But it is still around the main requirement given by you. It will be good to first look at the Action model class and createActions() method in AppRxjava. from there, you should be able to follow the code. To simulate some time consumption, I have used the famous Thread.sleep() technique.

public class AppRxJava{
    /* To listen to the completion of a task, so that the dependent tasks may be scheduled. */
    private Subject<Action> completionSub = io.reactivex.subjects.BehaviorSubject.create();

    /* To listen to the completion of all tasks, so that ExecutorService may shut down. */
    private Subject<Boolean> allActionCompletedSub = io.reactivex.subjects.BehaviorSubject.create();

    private ExecutorService SVC = Executors.newCachedThreadPool();
    private List<Action> allActions;

    public static void main( String[] args ){
        new AppRxJava().start();
    }

    private void start() {
        this.allActions = createActions();
        subscribeToActionCompletions();
        subscribeToSvcShutdown();

        startAllActions( this.allActions );
    }

    private void subscribeToSvcShutdown(){
        /* If all actions have been completed, shut down the ExecutorService. */
        this.allActionCompletedSub.subscribe( allScheduled -> {
            if( allScheduled ) {
                SVC.shutdown();
                try {
                    SVC.awaitTermination( 2, TimeUnit.SECONDS );
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
    }

    private void subscribeToActionCompletions(){
        this.completionSub.subscribe( complAction -> {
            /* Get the actions that are dependent on this recently completed action and "attempt" to start them. */
            List<Action> deps = getDeps( complAction, this.allActions );
            startAllActions( deps );

            /* If all actions have got completed, raise the flag. */
            if( allActionsCompleted() ) this.allActionCompletedSub.onNext( true );
        });
    }

    /* Attempts to start all actions that are present in the passed list. */
    private void startAllActions( List<Action> actions ){
        for( Action action : actions ) {
            startAction( action, actions );
        }
    }

    /* Attempts to start an action. Only if it is still pending and all of its dependencies are completed. */
    private void startAction( Action a, List<Action> list ){
        if( !a.isPending() ) return;
        if( !allDepsCompleted( a, allActions ) ) return;

        if( a.isPending() ) {
            synchronized (a.LOCK ) {
                if( a.isPending() ) {
                    a.setStatus( 1 ); //Set to running, so that it is not picked up twice. 
                    SVC.submit( () -> {
                        try {
                            a.getAction().call();
                        } catch (Exception e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }

                        a.setStatus( 2 ); //Set to completed. (We may have to synchronize this.)
                        this.completionSub.onNext( a );
                    } );
                }
            }
        }
    }

    private boolean allActionsCompleted(){
        for( Action a : this.allActions ) if( !a.isCompleted() ) return false;
        return true;
    }

    private static boolean allDepsCompleted( Action a, List<Action> allActions ){
        for( Action dep : allActions ) {
            if( a.getDependencies().contains( dep ) && !dep.isCompleted() ) return false;
        }

        return true;
    }

    /* Returns the actions that are dependent on Action <code>a</code>. */
    private List<Action> getDeps( Action a, List<Action> list ){
        List<Action> deps = new ArrayList<>();
        for( Action dep : list ) if( dep.getDependencies().contains( a ) ) deps.add( dep );
        return deps;
    }

    /* Creates the action list with respective dependencies. */
    private List<Action> createActions(){
        List<Action> actions = new ArrayList<>();

        Action a = createAction( 5000, "ServiceA", null );
        Action b = createAction( 5000, "ServiceB", null );
        Action c = createAction( 2000, "ServiceC", a, b );
        Action d = createAction( 2000, "ServiceD", c );
        Action e = createAction( 2000, "ServiceE", d );

        actions.add( a ); actions.add( b ); actions.add( c ); actions.add( d ); actions.add( e );
        return actions;
    }

    private Action createAction( final long sleepMillis, final String name, Action... dependencies ) {
        List<Action> deps = null;
        if( dependencies != null ) {
            deps = new ArrayList<>();
            for( Action a : dependencies ) deps.add( a );
        }
        return Action.of( () -> {
            System.out.println( "Service (" + name + ") started" );
            try {
                Thread.sleep( sleepMillis );
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            System.out.println( "Service (" + name + ") completed" );
            return true;
        }, name, deps );
    }


}

And the Action model class. This represents one action and a list of actions that it is dependent upon. (A slight difference from your original representation. But either way is OK, if you handle it accordingly, I think.)

public class Action{
    Callable<Boolean> action;
    String name;
    List<Action> dependencies = new ArrayList<>();
    AtomicInteger status = new AtomicInteger( 0 ); //0 = Pending, 1 = Scheduled, 2 = Completed
    public static final Object LOCK = new Object();

    private Action(Callable<Boolean> action, String name, List<Action> dependencies) {
        super();
        this.action = action;
        this.name = name;
        if( dependencies != null ) this.dependencies = dependencies;
    }

    public static Action of( Callable<Boolean> action, String name, List<Action> dependencies ){
        return new Action( action, name, dependencies );
    }

    public Callable<Boolean> getAction(){
        return action;
    }

    public String getName(){
        return name;
    }

    public List<Action> getDependencies(){
        return dependencies;
    }

    public boolean isCompleted(){
        return this.status.get() == 2;
    }

    public boolean isPending(){
        return this.status.get() == 0;
    }

    public boolean isScheduled(){
        return this.status.get() == 1;
    }

    public void setStatus( int status ){
        this.status.getAndSet( status );
    }

    @Override
    public int hashCode(){
        final int prime = 31;
        int result = 1;
        result = prime * result + ((name == null) ? 0 : name.hashCode());
        return result;
    }

    @Override
    public boolean equals( Object obj ){
        if (this == obj) return true;
        if (obj == null) return false;
        if (getClass() != obj.getClass()) return false;
        Action other = (Action) obj;
        if (name == null) {
            if (other.name != null)
                return false;
        } else if (!name.equalsIgnoreCase( other.name )) return false;
        return true;
    }

}
Conchiolin answered 13/1, 2020 at 14:44 Comment(3)
Thank you for suggesting the reactive way. Although I never tried this dimension. Once I evaluate & understand the solution you provided, I will be able to mark it.Derek
can you give more details on this Callable class, it doesn't appear to be the java.util.concurrent.Callable. Also what is the DataVo and what is the interface for your services, are they different?Sinewy
@Sinewy If you are referring to solution I have given: - I am using java.util.concurrnet.Callable. - Also, as I have mentioned in my answer, I had to write a new code on the same requirement since the approach had to be different very much. Hence, it doesn't use DataVo of the orginal post. (Pardon me, if I got your question wrong.)Conchiolin
S
1

thenCombine can be used to express dependencies between CompletionStages allowing you to perform a task after both have completed. You can then preform the subsequent actions with thenApply:

CompletionStage<ServiceAResponse> serviceAResponse = callServiceA();
CompletionStage<ServiceBResponse> serviceBResponse = callServiceB();


CompletionStage<ServiceEResponse> result = serviceA.thenCombine(serviceBResponse, (aResponse, bResponse) -> serviceC.call(aResponse, bResponse))                                                     
                                         .thenApply(cResponse -> serviceD.call(cResponse))                                                    
                                         .thenApply(dResponse -> serviceE.call(eResponse))



public CompletionStage<ServiceAResponse> callServiceA() {
    return CompletableFuture.supplyAsync(() -> serviceA.call());
}

public CompletionStage<ServiceBResponse> callServiceB() {
    return CompletableFuture.supplyAsync(() -> serviceB.call());
}
Sinewy answered 13/1, 2020 at 15:36 Comment(1)
The number of services and their execution order is totally dynamic. In 1 flow, there are a few serial services then a few parallel, then again a serial and then parallel and this goes on.Derek
C
1

Just couldn't take my mind off the basic question of doing it with pure Java. So, here is a modified version of my earlier answer. This answer contains both styles - RxJava and ExecutorService. It contains 3 classes:

  1. DependentSeriesOfActionsBase: A base class that contains some reusable methods and common fields. This is just for convenience and easy understanding of the code.
  2. DependentSeriesOfActionsCoreJava: This is the ExecutorService based implementation. I am using Future.get() to wait for the results of an action, with the difference that the waiting itself is happening asynchronously. Take a look at startAction().
  3. DependentSeriesOfActionsRxJava: The earlier RxJava based implementation.

Code: DependentSeriesOfActionsBase

abstract class DependentSeriesOfActionsBase{
    protected List<Action> allActions;
    protected ExecutorService SVC = Executors.newCachedThreadPool();

    protected boolean allActionsCompleted(){
        for( Action a : this.allActions ) if( !a.isCompleted() ) return false;
        return true;
    }

    protected static boolean allDepsCompleted( Action a, List<Action> allActions ){
        for( Action dep : allActions ) {
            if( a.getDependencies().contains( dep ) && !dep.isCompleted() ) return false;
        }

        return true;
    }

    /* Returns the actions that are dependent on Action <code>a</code>. */
    protected List<Action> getDeps( Action a, List<Action> list ){
        List<Action> deps = new ArrayList<>();
        for( Action dep : list ) if( dep.getDependencies().contains( a ) ) deps.add( dep );
        return deps;
    }

    /* Creates the action list with respective dependencies. */
    protected List<Action> createActions(){
        List<Action> actions = new ArrayList<>();

        Action a = createAction( 5000, "ServiceA", null );
        Action b = createAction( 5000, "ServiceB", null );
        Action c = createAction( 2000, "ServiceC", a, b );
        Action d = createAction( 2000, "ServiceD", c );
        Action e = createAction( 2000, "ServiceE", d );

        actions.add( a ); actions.add( b ); actions.add( c ); actions.add( d ); actions.add( e );
        return actions;
    }

    protected Action createAction( final long sleepMillis, final String name, Action... dependencies ) {
        List<Action> deps = null;
        if( dependencies != null ) {
            deps = new ArrayList<>();
            for( Action a : dependencies ) deps.add( a );
        }
        return Action.of( () -> {
            System.out.println( "Service (" + name + ") started" );
            try {
                Thread.sleep( sleepMillis );
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            System.out.println( "Service (" + name + ") completed" );
            return true;
        }, name, deps );
    }

    /* Attempts to start all actions that are present in the passed list. */
    protected void startAllActions( List<Action> actions ){
        for( Action action : actions ) {
            startAction( action, actions );
        }
    }

    protected abstract void startAction( Action action, List<Action> actions );


    protected void shutdown(){
        SVC.shutdown();
        try {
            SVC.awaitTermination( 2, TimeUnit.SECONDS );
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

Code: DependentSeriesOfActionsCoreJava

public class DependentSeriesOfActionsCoreJava extends DependentSeriesOfActionsBase{
    public static void main( String[] args ){
        new DependentSeriesOfActionsCoreJava().start();
    }

    private void start() {
        this.allActions = createActions();
        startAllActions( this.allActions );
    }

    protected void startAction( Action a, List<Action> list ){
        if( !a.isPending() ) return;
        if( !allDepsCompleted( a, allActions ) ) return;

        if( a.isPending() ) {
            synchronized (a.LOCK ) {
                if( a.isPending() ) {
                    a.setStatus( 1 ); //Set to running, so that it is not picked up twice. 

                    /* Submit the action to the ExecutorService and get the handle to the Future. */
                    final Future<?> fut = SVC.submit( () -> a.action.call() );

                    /* Submit the Future.get() action to the ExecutorService and execute the dependencies when it returns. */
                    SVC.submit( () -> {
                        Object returnVal = null;
                        /* Wait */
                        try {
                            fut.get(); 
                            a.setStatus( 2 );

                            /* If all actions are completed, shut down the ExecutorService. */
                            if( allActionsCompleted() ) shutdown();
                        } catch (InterruptedException | ExecutionException e) {
                            e.printStackTrace();
                        }

                        startAllActions( getDeps( a, this.allActions ) );
                    } );

                }
            }
        }
    }
}

Code: DependentSeriesOfActionsRxJava

public class DependentSeriesOfActionsRxJava extends DependentSeriesOfActionsBase{
    /* To listen to the completion of a task, so that the dependent tasks may be scheduled. */
    private Subject<Action> completionSub = io.reactivex.subjects.BehaviorSubject.create();

    /* To listen to the completion of all tasks, so that ExecutorService may shut down. */
    private Subject<Boolean> allActionCompletedSub = io.reactivex.subjects.BehaviorSubject.create();

    public static void main( String[] args ){
        new DependentSeriesOfActionsRxJava().start();
    }

    private void start() {
        this.allActions = createActions();
        subscribeToActionCompletions();
        subscribeToSvcShutdown();

        startAllActions( this.allActions );
    }

    private void subscribeToSvcShutdown(){
        /* If all actions have been completed, shut down the ExecutorService. */
        this.allActionCompletedSub.subscribe( allScheduled -> {
            if( allScheduled ) shutdown();
        });
    }

    private void subscribeToActionCompletions(){
        this.completionSub.subscribe( complAction -> {
            /* Get the actions that are dependent on this recently completed action and "attempt" to start them. */
            List<Action> deps = getDeps( complAction, this.allActions );
            startAllActions( deps );

            /* If all actions have got completed, raise the flag. */
            if( allActionsCompleted() ) this.allActionCompletedSub.onNext( true );
        });
    }

    /* Attempts to start an action. Only if it is still pending and all of its dependencies are completed. */
    protected void startAction( Action a, List<Action> list ){
        if( !a.isPending() ) return;
        if( !allDepsCompleted( a, allActions ) ) return;

        if( a.isPending() ) {
            synchronized (a.LOCK ) {
                if( a.isPending() ) {
                    a.setStatus( 1 ); //Set to running, so that it is not picked up twice. 
                    SVC.submit( () -> {
                        try {
                            a.getAction().call();
                        } catch (Exception e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }

                        a.setStatus( 2 ); //Set to completed. (We may have to synchronize this.)
                        this.completionSub.onNext( a );
                    } );
                }
            }
        }
    }

}
Conchiolin answered 14/1, 2020 at 4:50 Comment(0)
L
1

So one simple thing you can do is wrap your services in a way that makes management far easier by

  • providing a name
  • maintaining the list of preconditions to run it and
  • telling you when the service run itself is finished (so the other services' preconditions can be checked)

This is what such a class could look like:

class Service implements Runnable {

    private final Runnable wrappedRunnable;
    private final String name;
    private final List<String> preconditions = new CopyOnWriteArrayList<>();
    private final Consumer<String> finishedNotification;

    Service(Runnable r, String name, Consumer<String> finishedNotification, String... preconditions) {
        this.wrappedRunnable = r;
        this.name = name;
        this.finishedNotification = finishedNotification;
        this.preconditions.addAll(Arrays.asList(preconditions));
    }

    @Override
    public void run() {
        wrappedRunnable.run();
        finishedNotification.accept(name);
    }

    void preconditionFulfilled(String precondition) {
        preconditions.remove(precondition);
    }
    boolean arePreconditionsFulfilled() {
        return preconditions.isEmpty();
    }
}

The Runnable argument is the actual service call you want to wrap, the name is the management information in your table ("Service A", or "01" or whatever you want), and the preconditions are the names of the other services that need to have finished running before this is executed.

Now you still need some manager to maintain the list of all services - this is also what would need to be notified upon service call completion. This can actually be pretty straightforward by maintaining a simple list of the services.

class CallManager {
    List<Service> services = new ArrayList<>();
    ExecutorService executorService = Executors.newFixedThreadPool(4);

    void addService(Runnable r, String serviceName, String... preconditions) {
        services.add(new Service(r, serviceName, this::preconditionFulfilled, preconditions));
    }

    void run() {
        for (Iterator<Service> serviceIterator = services.iterator(); serviceIterator.hasNext(); ) {
            Service service = serviceIterator.next();
            if (service.arePreconditionsFulfilled()) {
                executorService.submit(service);
                serviceIterator.remove();
            }
        }
        if (services.isEmpty()) {
            executorService.shutdown();
        }
    }

    private synchronized void preconditionFulfilled(String name) {
        System.out.printf("service %s finished%n", name);
        for (Iterator<Service> serviceIterator = services.iterator(); serviceIterator.hasNext(); ) {
            Service service = serviceIterator.next();
            service.preconditionFulfilled(name);
            if (service.arePreconditionsFulfilled()) {
                executorService.submit(service);
                serviceIterator.remove();
            }
        }
        if (services.isEmpty()) {
            executorService.shutdown();
        }
    }
}

So first you need to add all services you want to run to this manager, then call run() on it to bootstrap the whole execution chain. This is what this might look like for your example:

class Bootstrap {
    private static final Random RANDOM = new Random();
    public static void main(String[] args) {
        CallManager manager = new CallManager();
        manager.addService(simpleRunnable("A"), "A");
        manager.addService(simpleRunnable("B"), "B");
        manager.addService(simpleRunnable("C"), "C", "A", "B");
        manager.addService(simpleRunnable("D"), "D", "C");
        manager.addService(simpleRunnable("E"), "E", "D");
        manager.addService(simpleRunnable("F"), "F");
        manager.addService(simpleRunnable("G"), "G", "E", "F");

        manager.run();
    }

    // create some simple pseudo service
    private static Runnable simpleRunnable(String s) {
        return () -> {
            System.out.printf("running service %s%n", s);
            try {
                Thread.sleep(RANDOM.nextInt(2000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
    }
}

You can see this running here.

Lalo answered 14/1, 2020 at 11:21 Comment(0)
L
0

I haven't gone through the whole of your code since it looks like mostly parallelism management, but the general concept you can implement via

CompletableFuture.allOf(CompletableFuture.runAsync(serviceA::run),
                        CompletableFuture.runAsync(serviceB::run))
                 .thenRun(serviceC::run)
                 .thenRun(serviceD::run)
                 .thenRun(serviceE::run);

This creates the dependencies in your table (assuming all services implement Runnable); it will not wait for the whole thing to finish execution (which you can do by appending .join() or .get()).

Lalo answered 13/1, 2020 at 15:30 Comment(3)
There can be hundreds of services and their execution order can be changed on demand.Derek
@Derek In that case, the actual complexity of your problem is checking that it's possible that the given service dependencies can even be fulfilled, eg 1 depends on 2 and 3, 2 depends on 4, and 4 depends on 1. That would mean building a dependency graph like Thomas said will probably be worth the effort. Actually running the services will be the least of your problems.Lalo
Yes dependency graph is a good idea. But at moment, my above code has achieved it somehow in traditional way.Derek
R
0

you did not tell if the task C which runs after tasks A and B needs results of A and B. Suppose it does (if not, the solution would be even shorter). Then the solution using DF4J library can be something like:

Class A extends AsyncProc {
   ScalarResult<T> res = new ScalarResult<>();
   @Override
   protected void runAction() {
       ...
       res.onSuccess(value);
   }
}
Class B extends AsyncProc {
   ScalarResult<T> res = new ScalarResult<>();
   @Override
   protected void runAction() {
       ...
       res.onSuccess(value);
   }
}
Class C extends AsyncProc {
   InpScalar<T> param1 = new InpScalar<>(this);
   InpScalar<T> param2 = new InpScalar<>(this);
   ScalarResult<T> res = new ScalarResult<>();
   @Override
   protected void runAction() {
       value = ... param1.current() ...param2.current()...
       res.onSuccess(value);
   }
}
Class D extends AsyncProc {
   InpScalar<T> param = new InpScalar<>(this);
   ScalarResult<T> res = new ScalarResult<>();
   @Override
   protected void runAction() {
       value = ... param.current()
       res.onSuccess(value);
   }
}
Class E extends AsyncProc {
   InpScalar<T> param = new InpScalar<>(this);
   ScalarResult<T> res = new ScalarResult<>();
   @Override
   protected void runAction() {
       value = ... param.current()
       res.onSuccess(value);
   }
}

Thus we declared asynchronous procedures with various number of parameters. Then we create instances and connect them in a dataflow graph:

 A a = new A(); a.start();
 B b = new A(); b.start();
 C c = new A(); c.start();
 D d = new A(); d.start();
 E e = new A(); e.start();
 a.res.subscribe(c.param1);
 b.res.subscribe(c.param2);
 c.res.subscribe(d.param);
 d.res.subscribe(e.param);

Finally, wait the result of the last async proc in synchronous way:

 T result = e.res.get();
Registrar answered 14/1, 2020 at 0:11 Comment(1)
Task C must execute after successful execution of task A and B. Task C during its execution needs some data that has been populated by Task A and B.Derek

© 2022 - 2024 — McMap. All rights reserved.