Pipeline Submission from App Engine

2019-08-10 20:08发布

问题:

I have a requirement to send Datastore Entities to a BigQuery table while at the same time doing transformation of the data. My design so far as follows:
AppEngine Java application publishes data to a Topic in the PUB/SUB service - got that working. Then have a DataflowPipeline subscribe to the topic and read the message. The transform is then done and result written to BigQuery. I have some sample code running to test this.

I have a crude Pipeline working from my local development machine which I can run - all working as demo code. This is run locally via mvn appengine:devserver

The question is now: How do you deploy the Dataflow Pipeline from Google App Engine? The development machine does not have access to the production environment, so I cannot get my Pipeline running on the Google Pipeline Service. I have tried to submit this from Google App Engine, but received out of memory errors. This seems related to some authentication problem. From other post here on StackOverflow it seems as if this "deploy" from App Engine is not supported "officially".

How would one do this in production environment then?

Environment dependancies so far:
maven 3.3.0
Google AppEngine 1.9.28
Google API client 1.20.0
Java 1.7.0_79
Workstation - Windows 7
Google Development Environment : Gold Package
This is my sample code to get the pipeline process running....

DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
        options.setNumWorkers(2);
        options.setRunner(DataflowPipelineRunner.class);
        options.setStagingLocation("gs://pipeline_bucket2");
        options.setProject("projectname");
        options.setJobName("starterpipeline");
        options.setUpdate(true);

        Pipeline p = Pipeline.create(options);

        p.apply(Create.of("Hello", "World")).apply(ParDo.named("StringExtract").of(new DoFn<String, String>() {
            @Override
            public void processElement(ProcessContext c) {
                c.output(c.element().toUpperCase());
            }
        })).apply(ParDo.named("StringLogger").of(new DoFn<String, Void>() {
            @Override
            public void processElement(ProcessContext c) {
                LOG.info(c.element());
            }
        }));

        p.run();

This is my stack trace for the error when trying to run the code above:

Uncaught exception from servlet
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:2271)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection$BufferingOutputStream.write(URLFetchServiceStreamHandler.java:586)
    at com.google.api.client.util.ByteStreams.copy(ByteStreams.java:55)
    at com.google.api.client.util.IOUtils.copy(IOUtils.java:94)
    at com.google.api.client.http.AbstractInputStreamContent.writeTo(AbstractInputStreamContent.java:72)
    at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:79)
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
    at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
    at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
    at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
    at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
    at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
    at java.util.concurrent.FutureTask.run(FutureTask.java:260)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:605)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1$1.run(ApiProxyImpl.java:1152)
    at java.security.AccessController.doPrivileged(Native Method)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1.run(ApiProxyImpl.java:1146)
    at java.lang.Thread.run(Thread.java:745)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$2$1.run(ApiProxyImpl.java:1195)

回答1:

Dataflow uses a 64mb buffer when communicating with Google Cloud Storage when uploading your applications artifacts. The OOM can be caused if the instance your using doesn't have enough memory, for example if your using an AppEngine instance with 128mbs of memory.

Also note that the first time your Dataflow pipeline is executed whenever you update the module or AppEngine does an internal update, the Dataflow SDK needs to upload all the application artifacts that changed to Google Cloud Storage. Depending on the application size, this can take more than 60s which is the limit for a frontend instance request and can cause deadline exceeded errors.