How to do Async Http Call with Apache Beam (Java)?
Asked Answered
S

2

2

Input PCollection is http requests, which is a bounded dataset. I want to make async http call (Java) in a ParDo , parse response and put results into output PCollection. My code is below. Getting exception as following.

I cound't figure out the reason. need a guide....

java.util.concurrent.CompletionException: java.lang.IllegalStateException: Can't add element ValueInGlobalWindow{value=streaming.mapserver.backfill.EnrichedPoint@2c59e, pane=PaneInfo.NO_FIRING} to committed bundle in PCollection Call Map Server With Rate Throttle/ParMultiDo(ProcessRequests).output [PCollection]

Code:

public class ProcessRequestsFn extends DoFn<PreparedRequest,EnrichedPoint> {
    private static AsyncHttpClient _HttpClientAsync;
    private static ExecutorService _ExecutorService;

static{

    AsyncHttpClientConfig cg = config()
            .setKeepAlive(true)
            .setDisableHttpsEndpointIdentificationAlgorithm(true)
            .setUseInsecureTrustManager(true)
            .addRequestFilter(new RateLimitedThrottleRequestFilter(100,1000))
            .build();

    _HttpClientAsync = asyncHttpClient(cg);

    _ExecutorService = Executors.newCachedThreadPool();

}


@DoFn.ProcessElement
public void processElement(ProcessContext c) {

    PreparedRequest request = c.element();

    if(request == null)
        return;

    _HttpClientAsync.prepareGet((request.getRequest()))
            .execute()
            .toCompletableFuture()
            .thenApply(response -> { if(response.getStatusCode() == HttpStatusCodes.STATUS_CODE_OK){
                                                return response.getResponseBody();
                                            } return null; } )
            .thenApply(responseBody->
                    {
                        List<EnrichedPoint> resList = new ArrayList<>();
                        /*some process logic here*/
                        System.out.printf("%d enriched points back\n", result.length());
                        }
                        return resList;

                    })
            .thenAccept(resList -> {
                for (EnrichedPoint enrichedPoint : resList) {
                    c.output(enrichedPoint);
                }
            })
            .exceptionally(ex->{
                System.out.println(ex);
                return null;
            });

  }
}
Scraperboard answered 17/4, 2018 at 18:20 Comment(2)
Somehow my gut tells me do async in Beam is hard, if not impossible. as it doesn't know when the call is going to be back. but, specifically in batch process with bounded data. is it posssible?Scraperboard
the end goal is to speed up http calls as much as possible.Scraperboard
P
1

The issue that your hitting is that your outputting outside the context of a processElement or finishBundle call.

You'll want to gather all your outputs in memory and output them eagerly during future processElement calls and at the end within finishBundle by blocking till all your calls finish.

Poised answered 18/9, 2018 at 22:7 Comment(0)
H
6

The Scio library implements a DoFn which deals with asynchronous operations. The BaseAsyncDoFn might provide you the handling you need. Since you're dealing with CompletableFuture also take a look at the JavaAsyncDoFn.

Please note that you necessarily don't need to use the Scio library, but you can take the main idea of the BaseAsyncDoFn since it's independent of the rest of the Scio library.

Handed answered 3/7, 2018 at 12:39 Comment(0)
P
1

The issue that your hitting is that your outputting outside the context of a processElement or finishBundle call.

You'll want to gather all your outputs in memory and output them eagerly during future processElement calls and at the end within finishBundle by blocking till all your calls finish.

Poised answered 18/9, 2018 at 22:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.