Make API multiple times with AsyncRestTemplate and wait for all to complete
Asked Answered
A

3

10

I have to make Rest API invocation using RestTemplate multiple time with different parameters. API is same but it is the parameter that is getting changed. Number of times is also variable. I want to use AsyncRestTemplate but my main Thread should wait until all API calls have been successfully completed. I also want to work with responses that each API call returned. Currently I am using RestTemplate. In basic form it is as following.

List<String> listOfResponses = new ArrayList<String>();
for (Integer studentId : studentIdsList) {
    String respBody;
    try {
        ResponseEntity<String> responseEntity = restTemplate.exchange(url, method, requestEntity, String.class);
    } catch (Exception ex) {
        throw new ApplicationException("Exception while making Rest call.", ex);
    }
    respBody = requestEntity.getBody();
    listOfResponses.add(respBody);          
}

How can I implement AsyncRestTemplate in this situation?

Abradant answered 8/6, 2017 at 2:22 Comment(3)
Note that the code you provide in the question does not compile and does not seem to be correct (studentId is unused, respBody = request body). A good question should include an minimal reproducible example as well as what you have tried.Palua
May be I'm missing something here. Have you thought about updating your web service to accept collection of ids as a argument ?Arroba
The web service I consume is out of my scopeAbradant
P
12

The main idea when using AsyncRestTemplate (or any asynchronous API, in fact), is to send all you requests in a first time, keeping the corresponding futures, then process all responses in a second time. You can simply do this with 2 loops:

List<ListenableFuture<ResponseEntity<String>>> responseFutures = new ArrayList<>();
for (Integer studentId : studentIdsList) {
    // FIXME studentId is not used
    ListenableFuture<ResponseEntity<String>> responseEntityFuture = restTemplate.exchange(url, method, requestEntity, String.class);
    responseFutures.add(responseEntityFuture);
}
// now all requests were send, so we can process the responses
List<String> listOfResponses = new ArrayList<>();
for (ListenableFuture<ResponseEntity<String>> future: responseFutures) {
    try {
        String respBody = future.get().getBody();
        listOfResponses.add(respBody);
    } catch (Exception ex) {
        throw new ApplicationException("Exception while making Rest call.", ex);
    }
}

Note: if you need to pair the responses with the original requests, you can replace the list of futures with a map or a list of request+response objects.

I also noted that studentId is not used in your question.

Palua answered 12/6, 2017 at 8:55 Comment(4)
but how to make sure all requests have been processed? when get() called on each future, may be request is still being processed?Abradant
get() is blocking, it will only return after the request completes – while the other requests are still being processed as well. So if the second loop completes normally, all requests will have been processed.Palua
is there a way to do this in unblocking way?Abradant
Your question said “my main Thread should wait until all API calls have been successfully completed” so this basically means blocking on each until it is completed. The important point is that the requests are performed in parallel so the time it will take is roughly the duration of the slowest request.Palua
M
6

You could use Java 8 Stream API, if that's feasible for you:

List<String> listOfResponses = studentIdsList.stream()
    .parrallel()
    .map({studentId ->
        ResponseEntity<String> responseEntity = restTemplate.exchange(url, method, studentId, String.class);
        return responseEntity.getBody();
    })
    .collect(Collectors.toList());

This code will basically perform 2 things:

  1. Performs requests in parallel;
  2. Collect the results of the requests into a List.

UPDATE: Agree with @Didier L - this solution may not work properly when you need to do a lot of requests. Here is an updated version:

List<String> listOfResponses  = studentIdsList.stream()
                .map(studentId -> asyncRestTemplate.exchange(url, method, studentId, String.class)
                .collect(Collectors.toList()).stream()
                .map(this::retrieveResult)
                .collect(Collectors.toList());

    /**
     * Retrieves results of each request by blocking the main thread. Note that the actual request was performed on the previous step when
     * calling asyncRestTemplate.exchange(url, method, studentId, String.class)
     */
    private String retrieveResult(ListenableFuture<ResponseEntity<String>> listenableFuture) {
        try {
            return listenableFuture.get().getBody();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
Meddle answered 12/6, 2017 at 10:58 Comment(6)
By default, parallel streams are configured for CPU-intensive tasks. If you use a parallel stream for I/O intensive tasks, you'd have to configure a custom fork-join pool to run it. However this won't scale very well compared to an asynchronous API, since your I/O tasks will consume many threads.Palua
@DidierL thanks for pointing this out. I have updated my answer.Meddle
Unfortunately your second solution wouldn't work as expected because sequential stream process elements one by one: this will create each future and immediately call get() on it, before creating the next one. You should collect all created futures first before trying to retrieve their results.Palua
@DidierL That's exactly what it does. First 'map' operation initiates requests and collects all the futures. The second 'map' operation retrieves results from these futures one-by-one. You can think of asyncRestTemplate.exchange(..) as of Thread Pool which submits a task for a request to the external resource and returns a Future as its result. BTW, just checked it on my machine and it works just fine.Meddle
It will work, but it is sequential, not parallel. The first map() will not collect all the futures – you'd need a collect() call for that. Instead, the first request will be sent (first map()), then wait for the response (second map()), then second request will be sent (first map()) etc. See How to properly submit and get several Futures in the same Java stream?.Palua
Yeah, you are right again. I thought collection on the last step would trigger a chain of operations one be one (map for all elements, then again map for all elements). But now I realise that it wold be silly if to speak about performance :) Thanks for your comment, I updated my answer.Meddle
V
2

Here is another solution I would like to suggest which uses Spring's RestTemplate rather than AsyncRestTemplate. It is also using Java 8 CompletableFuture.

public void sendRequestsAsync(List<Integer> studentList) {
    List<CompletableFuture<Void>> completableFutures = new ArrayList<>(studentList.size()); //List to hold all the completable futures
    List<String> responses = new ArrayList<>(); //List for responses
    ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    for (Integer studentId : studentList) { //Iterate student list
        CompletableFuture<Void> requestCompletableFuture = CompletableFuture
                .supplyAsync(
                        () -> restTemplate.exchange("URL/" + studentId, HttpMethod.GET, null, String.class),
                        yourOwnExecutor
                )//Supply the task you wanna run, in your case http request
                .thenApply((responseEntity) -> {
                    responses.add(responseEntity.getBody());
                    return responseEntity;
                })//now you can add response body to responses
                .thenAccept((responseEntity) -> {
                    doSomeFinalStuffWithResponse(responseEntity);
                })//here you can do more stuff with responseEntity (if you need to)
                .exceptionally(ex -> {
                    System.out.println(ex);
                    return null;
                });//do something here if an exception occurs in the execution;

        completableFutures.add(requestCompletableFuture);
    }

    try {
        CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).get(); //Now block till all of them are executed by building another completablefuture with others.
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

I like this solution more because I can chain as much business logic I want and don't have to depend on Spring's internals for Async Sending. Obviously you can clean up the code more, I haven't paid much attention into that for now.

Victualer answered 12/6, 2017 at 22:23 Comment(2)
In fact you can easily convert ListenableFuture into a CompletableFuture and back. Also the AsyncRestTemplate is part of the public API as much as RestTemplate, it is not a "Spring internal". If you want to do asynchronous stuff it is probably better to rely on what is provided by the API as much as possible.Palua
Yeah I agree my bad on saying its spring internal. What I was trying to convey was we can get it done via Java's own api rather for async stuff rather than relying on async template and listenablefuture.Victualer

© 2022 - 2024 — McMap. All rights reserved.