I have a requirement to send Datastore Entities to a BigQuery table while at the same time doing transformation of the data.
My design so far as follows:
AppEngine Java application publishes data to a Topic in the PUB/SUB service - got that working.
Then have a DataflowPipeline subscribe to the topic and read the message. The transform is then done and result written to BigQuery. I have some sample code running to test this.
I have a crude Pipeline working from my local development machine which I can run - all working as demo code. This is run locally via
mvn appengine:devserver
The question is now: How do you deploy the Dataflow Pipeline from Google App Engine? The development machine does not have access to the production environment, so I cannot get my Pipeline running on the Google Pipeline Service. I have tried to submit this from Google App Engine, but received out of memory errors. This seems related to some authentication problem. From other post here on StackOverflow it seems as if this "deploy" from App Engine is not supported "officially".
How would one do this in production environment then?
Environment dependancies so far:
maven 3.3.0
Google AppEngine 1.9.28
Google API client 1.20.0
Java 1.7.0_79
Workstation - Windows 7
Google Development Environment : Gold Package
This is my sample code to get the pipeline process running....
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setNumWorkers(2);
options.setRunner(DataflowPipelineRunner.class);
options.setStagingLocation("gs://pipeline_bucket2");
options.setProject("projectname");
options.setJobName("starterpipeline");
options.setUpdate(true);
Pipeline p = Pipeline.create(options);
p.apply(Create.of("Hello", "World")).apply(ParDo.named("StringExtract").of(new DoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().toUpperCase());
}
})).apply(ParDo.named("StringLogger").of(new DoFn<String, Void>() {
@Override
public void processElement(ProcessContext c) {
LOG.info(c.element());
}
}));
p.run();
This is my stack trace for the error when trying to run the code above:
Uncaught exception from servlet
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection$BufferingOutputStream.write(URLFetchServiceStreamHandler.java:586)
at com.google.api.client.util.ByteStreams.copy(ByteStreams.java:55)
at com.google.api.client.util.IOUtils.copy(IOUtils.java:94)
at com.google.api.client.http.AbstractInputStreamContent.writeTo(AbstractInputStreamContent.java:72)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:79)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
at java.util.concurrent.FutureTask.run(FutureTask.java:260)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:605)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1$1.run(ApiProxyImpl.java:1152)
at java.security.AccessController.doPrivileged(Native Method)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1.run(ApiProxyImpl.java:1146)
at java.lang.Thread.run(Thread.java:745)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$2$1.run(ApiProxyImpl.java:1195)