I need to perform some tasks. Some of the tasks are independent and some are dependent on successful execution of other tasks. Independent tasks can be run in parallel for better performance. I call these tasks as services.
The column link
tells which services will be execute in series and which in parallel. The column order
describes the execution order that will be followed by a set of defined services. For below example, service A and B should run in parallel. If they have executed successfully then service C will execute. Please note service C is not directly dependent on output of its previous services but it must run after successful execution of its previous services because service C will require some data during its execution produced by its previous services. After successful execution of service C, the next service D will execute and so on this cycle will be continued until all services in the list have been consumed.
Tasks service link order
Service A 01 03 1
Service B 02 03 2
Service C 03 04 3
Service D 04 05 4
Service E 05 07 5
Service F 06 07 6
Service G 07 (null) 7
Following is my code.
public void executeTransactionFlow(DataVo dataVo) throws Exception {
List<Callable<Boolean>> threadList = new ArrayList<>();
List<String> serviceIds = new ArrayList<>();
List<Future<Boolean>> futureList;
String validatedRespCode = null, joinTo, prevJoinTo = null, serviceId;
// Iterating through service flows map
for (Map<String, String> map : serviceFlowsMap) {
joinTo = map.get("link");
serviceId = map.get("service");
// A simple flag to differentiate which services should execute parallel and which in serial.
if (null == prevJoinTo) {
prevJoinTo = joinTo;
}
// Check for join condition. If join condition is same as previous then do not execute the thread list yet add current service in list
if (null != joinTo && joinTo.equals(prevJoinTo)) {
threadList.add(new Callable<String, DataVo>(serviceId, dataVo));
}
/*
* 1. Run the threads in the list
* 2. Empty the thread list
* 3. Empty serviceIds list
* 4. Set prevJoinTo
*/
else {
if (threadList.size() > 0) {
prevJoinTo = joinTo;
try {
// If list contain only 1 service then call, otherwise invokeAll
futureList = MyExecutor.executeServices(threadList, dataVo);
// During execution we cannot interrupt services, so we check here after they get back to here and interrupt if it has been timedout.
if (dataVo.isTimedout()) {
throw new Exception("Transaction thread is Interrupted or Timed-out");
}
// Validate service response codes and get decision in case of any failure
validatedRespCode = validateResponseOfExecutedServices(dataVo, futureList, serviceIds);
// If validationRespCode is non 00 then do not process further
if (null != validatedRespCode && !"200".equals(validatedRespCode)) {
break;
}
}
catch (Exception e) {
throw new Exception(e.getMessage(), e);
}
finally {
// clear thread list and serviceIds list. It will be populated for next parallel set of threads
threadList.clear();
serviceIds.clear();
}
}
// Start preparing new thread list
// Adding current service_id into threadList after executing previous services in parallel.
threadList.add(new Callable<String, DataVo>(serviceId, dataVo));
}
}
// Run remaining services
if (!threadList.isEmpty()) {
try {
futureList = MyExecutor.executeServices(threadList, dataVo);
validatedRespCode = validateResponseOfExecutedServices(dataVo, futureList, serviceIds);
}
catch (Throwable e) {
throw new Exception(e.getMessage(), e);
}
}
// Check validation response code
if (null != validatedRespCode && !"200".equals(validatedRespCode)) {
MyExecutor.callDeclineFlow(dataVo, validatedRespCode, null);
}
}
/**
* This method iterates through the thread list and checks for exceptions and service responses.
* If service response is not success or if any exception has occurred then exception is thrown
*/
public String validateResponseOfExecutedServices(DataVo dataVo, List<Future<Boolean>> futureList, List<String> serviceIds) throws Exception {
String finalResponse = "200", serviceResponse = null;
/*
* future list will be null if single service is executed (no other parallel transactions). The reason is that we do
* not use invokeAll() on single service.
*/
if (null != futureList && futureList.size() > 0) {
for (Future<Boolean> future : futureList) {
try {
future.get();
}
catch (Exception e) {
throw new Exception(e.getMessage(), e);
}
}
}
// Iterate through serviceIds and check responses.
for (String serviceId : serviceIds) {
serviceResponse = dataVo.getServiceResponse(serviceId);
/*
* if one of following response is found then consider it exception
*/
if (null != serviceResponse && "400,401,402,403,404,500,501".contains(serviceResponse)) {
throw new Exception("One of the service has been declined");
}
}
return finalResponse;
}
If CompletableFuture
can be beneficial here, then how can I use that efficiently?
And future.get()
is a blocking call. In case I have 10 services that execute in parallel, then this future.get()
will be blocking others even if they have executed prior to the current for which we are waiting. How to avoid this blocking?
I have added more details of the problem statement i.e the addition of order column. The services need to follow the defined order. The order of service A and B is 1 and 2 respectively but still they will execute in parallel because both have 03
value in link
. I think dependency graphs based approach will not be required now as suggested by @Thomas in comments.