Asynchronous sequential calls based on condition checks in reactor
Asked Answered
D

3

12

Here, I am trying to make asynchronous and non-blocking calls using reactor and for each request, I may have to call two services in sequence (in my case below, getAccountInfoFromAAA and getAccountInfoFromBBB).

Here is my ItemRequest object:

public class ItemRequest {
    private Account account;
    private Result firstServiceResult;
    private Result secondServiceResult;
    private PostingParameterCode postingParameterCode; //enum 
    //...
    //...
    //getters and setters
}

So, my request input will contain multiple itemRequests and for each itemRequest, I am doing asynchronous calls as:

public void getAccountData(List<ItemRequest> itemRequests) {
    ImmutableList<ItemRequest> list = ImmutableList.copyOf(itemRequests);
    Flux.fromIterable(list).flatMap(this::callBothSors).blockLast();
}

public Mono<ItemRequest> callBothSors(ItemRequest itemRequest) {
    return getAccountDataService.getAccountDataFromAAAandBBB(itemRequest); 
    //here, it will enter into a sequential call for each itemRequest
}

This is my first service call interface:

public Mono<ItemRequest> getAccountDataFromAAA(ItemRequest itemRequest);

This is my second service call interface:

public Mono<ItemRequest> getAccountDataFromBBB(ItemRequest itemRequest);

This method will have upto two calls in sequence based on the condition:

public Mono<ItemRequest> getAccountDataFromAAAandBBB(ItemRequest itemRequest){
    Mono<ItemRequest> firstCallResult = Mono.empty();
    Mono<ItemRequest> secondCallResult = Mono.empty();

if(isFirstServiceCallRequired(itemRequest)){
    firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest); 
//basically, firstService call will update the accountKey information and
//will also set the result status to OK which is required to decide 
//whether to make secondService call.
} else {
    //Account key is already present, so just update the result status which I need later.
    Result result = new Result();
    result.setStatus(Result.Status.OK);
    result.setMessageText("First call not required as account info is set for item request");
    itemRequest.setFirstServiceResult(result);
}

//Now, before calling the second service, I need to check the following:

if(null!= itemRequest.getFirstServiceResult() && 
    itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) && 
    itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT)){ 
        secondCallResult = this.secondServiceCallImpl.getAccountDataFromBBB(itemRequest);
    }

    return firstCallResult.then(secondCallResult);  //attaching the
    //firstCallResult and secondCallResult to produce a single Mono

}

This is working fine when firstCallResult is not required. But when the first call is required, this condition check will not pass since I won't have first call result object updated:

if(null != itemRequest.getFirstServiceResult() && 
    itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) && 
    itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT))) { ... } 
 //this condition check will not pass because first service call is not actually executing

Both cases works fine if I put the following statement:

if(isFirstServiceCallRequired(itemRequest)){
        firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest); 
        firstCallResult.block(); //adding this case will work on both cases 
    }

But, I don't think I will get the reactors benefit this way. I was thinking to have the logic like this:

Mono<ItemRequest> result = firstService.call(...)
    .doOnNext(/*do something */)
    .then( ... secondService.call())

But couldn't figure out the way to chain the secondService with firstService to get the mono result and have those condition checks too. Condition check is important since I don't always want to execute the second service. Is there any way to chain the secondService with firstService to get the result and have those condition checks too?

Apologies for the long question. Any suggestions/help would be greatly appreciated.

Drunken answered 26/10, 2018 at 19:11 Comment(0)
D
6

After offering the bounty points to this question, I was really excited and expecting some answers. But anyways, I am able to improve my initial solution and have those condition checks too.

I did the following: I changed the return type from Mono<ItemRequest> to Mono<Void> in both service calls since I am basically updating the data to ItemRequest list:

Handling the parallel call here (each parallel call has a sequential call):

public void getAccountData(List<ItemRequest> itemRequests) {
        ImmutableList<ItemRequest> list = ImmutableList.copyOf(itemRequests);
        Flux.fromIterable(list).flatMap(this::callBothSors).blockLast();
    }

    public Mono<Void> callBothSors(ItemRequest itemRequest) {
        return getAccountDataService.getAccountDataFromAAAandBBB(itemRequest);
        //here, it will enter into a sequential call for each itemRequest
    }

and these are my firstServiceCall and secondServiceCall interface changes:

public Mono<Void> getAccountDataFromAAA(ItemRequest itemRequest);

public Mono<Void> getAccountDataFromBBB(ItemRequest itemRequest);

and I chained the secondServiceCall with firstServiceCall to get the mono result and have those condition checks too as:

public Mono<Void> getAccountDataFromAAAandBBB(ItemRequest itemRequest){
    Mono<Void> callSequence = Mono.empty();

    if(isFirstServiceCallRequired(itemRequest)){
        callSequence = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest);
    } else {
        //Account key is already present, so just update the result status which I need later.
        Result result = new Result();
        result.setStatus(Result.Status.OK);
        result.setMessageText("First call not required as account info is set for item request");
        itemRequest.setFirstServiceResult(result);
    }

    return callSequence.thenEmpty(Mono.defer(() -> {
        //note: Mono.defer ==>> Create a Mono provider that will supply a target Mono to subscribe to 
        //for each subscriber downstream.
        //only if the firstServiceCall result is successful & other condition check successful,
        // I am calling secondServiceCall:  
        if(shouldCallSecondService(itemRequest)){
            return this.secondServiceCallImpl.getAccountDataFromAAAandBBB(itemRequest);
        } else {
            return Mono.empty();
        }
    }))
Drunken answered 2/11, 2018 at 19:39 Comment(0)
H
0

Here are some news: A Reactor is not a silver bullet! :)

Whenever you need the response of a call to determine if you need to do something else, this will never be able to be fully parallelized. E.g. you could always do you last suggestion. However, this doesn't mean that using the Reactor doesn't give you any benefits!

Some of the benefits you get:

  • You are using Netty under the hood instead of Servlet, which helps to avoid locking on I/O operations. This can lead to better allocation of resources, making your system more resilient.
  • You can do other operations while waiting for a response. If you have things to do where the order doesn't matter, you can always put them there (e.g. auditing, logging etc).

I hope this answers your question :)

Huei answered 1/11, 2018 at 11:55 Comment(1)
thanks Sofo for your opinion and highlighting the benefits as well. :)Drunken
M
0
public Mono<ItemRequest> getAccountDataFromAAAandBBB(ItemRequest itemRequest) {
  Mono<ItemRequest> firstCallResult = Mono.empty();
  Mono<ItemRequest> secondCallResult = Mono.empty();

  if (isFirstServiceCallRequired(itemRequest)) {
    firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest);
    //basically, firstService call will update the accountKey information and
    //will also set the result status to OK which is required to decide
    //whether to make secondService call.
  } else {
  /*Account key is already present, so just update the result status which I need 
  later.*/
    firstCallResult = Mono.defer(() -> {
      Result result = new Result();
      result.setStatus(Result.Status.OK);
      result.setMessageText("First call not required as account info is set for item request");
      itemRequest.setFirstServiceResult(result);
      return Mono.just(itemRequest);
    });
  }

  return firstCallResult.flatMap(itReq -> {
    //Now, before calling the second service, I need to check the following:
    if (null != itemRequest.getFirstServiceResult() &&
        itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) &&
      itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT)) {
        return secondCallResult = this.secondServiceCallImpl.getAccountDataFromBBB(itemRequest);
  } else {
    return itReq;
  }
  });
}

The next simple example can help you with flatMap understanding:

public static void main(String[] args) {

  callExternalServiceA.flatMap(response -> {
    if(response.equals("200")){
      return Mono.just(response);
    } else {
      return callExtertnalServiceB();
    }
  }).block();

}

public static Mono<String> callExtertnalServiceA() {
  return Mono.defer(() -> {
    System.out.println("Call external service A");
    return Mono.just("400");
  });
}

public static Mono<String> callExtertnalServiceB() {
  return Mono.defer(() -> {
    System.out.println("Call external service B");
    return Mono.just("200");
  });
}
Mim answered 5/11, 2018 at 13:16 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.