How to do Async Http Call with Apache Beam (Java)?

2019-01-29 05:07发布

问题:

Input PCollection is http requests, which is a bounded dataset. I want to make async http call (Java) in a ParDo , parse response and put results into output PCollection. My code is below. Getting exception as following.

I cound't figure out the reason. need a guide....

java.util.concurrent.CompletionException: java.lang.IllegalStateException: Can't add element ValueInGlobalWindow{value=streaming.mapserver.backfill.EnrichedPoint@2c59e, pane=PaneInfo.NO_FIRING} to committed bundle in PCollection Call Map Server With Rate Throttle/ParMultiDo(ProcessRequests).output [PCollection]

Code:

public class ProcessRequestsFn extends DoFn<PreparedRequest,EnrichedPoint> {
    private static AsyncHttpClient _HttpClientAsync;
    private static ExecutorService _ExecutorService;

static{

    AsyncHttpClientConfig cg = config()
            .setKeepAlive(true)
            .setDisableHttpsEndpointIdentificationAlgorithm(true)
            .setUseInsecureTrustManager(true)
            .addRequestFilter(new RateLimitedThrottleRequestFilter(100,1000))
            .build();

    _HttpClientAsync = asyncHttpClient(cg);

    _ExecutorService = Executors.newCachedThreadPool();

}


@DoFn.ProcessElement
public void processElement(ProcessContext c) {

    PreparedRequest request = c.element();

    if(request == null)
        return;

    _HttpClientAsync.prepareGet((request.getRequest()))
            .execute()
            .toCompletableFuture()
            .thenApply(response -> { if(response.getStatusCode() == HttpStatusCodes.STATUS_CODE_OK){
                                                return response.getResponseBody();
                                            } return null; } )
            .thenApply(responseBody->
                    {
                        List<EnrichedPoint> resList = new ArrayList<>();
                        /*some process logic here*/
                        System.out.printf("%d enriched points back\n", result.length());
                        }
                        return resList;

                    })
            .thenAccept(resList -> {
                for (EnrichedPoint enrichedPoint : resList) {
                    c.output(enrichedPoint);
                }
            })
            .exceptionally(ex->{
                System.out.println(ex);
                return null;
            });

  }
}

回答1:

The issue that your hitting is that your outputting outside the context of a processElement or finishBundle call.

You'll want to gather all your outputs in memory and output them eagerly during future processElement calls and at the end within finishBundle by blocking till all your calls finish.



回答2:

The Scio library implements a DoFn which deals with asynchronous operations. The BaseAsyncDoFn might provide you the handling you need. Since you're dealing with CompletableFuture also take a look at the JavaAsyncDoFn.

Please note that you necessarily don't need to use the Scio library, but you can take the main idea of the BaseAsyncDoFn since it's independent of the rest of the Scio library.