Running Google Dataflow pipeline from a Google App

2019-02-18 13:22发布

问题:

I am creating a dataflow job using DataflowPipelineRunner. I tried the following scenarios.

  1. Without specifying any machineType
  2. With g1 small machine
  3. with n1-highmem-2

In all the above scenarios, Input is a file from GCS which is very small file(KB size) and output is Big Query table.

I got Out of memory error in all the scenarios

The size of my compiled code is 94mb. I am trying only word count example and it did not read any input(It fails before the job starts). Please help me understand why i am getting this error.

Note: I am using appengine to start the job.

Note: The same code works with beta versoin 0.4.150414

EDIT 1

As per the suggestions in the answer tried the following,

  1. Switched from Automatic scaling to Basic Scaling.
  2. Used machine type B2 which provides 256MB memory

After these configuration, Java Heap Memory problem is solved. But it is trying to upload a jar into stagging location which is more than 10Mb, hence it fails.

It logs the following exception

com.google.api.client.http.HttpRequest execute: exception thrown while executing request
com.google.appengine.api.urlfetch.RequestPayloadTooLargeException: The request to https://www.googleapis.com/upload/storage/v1/b/pwccloudedw-stagging-bucket/o?name=appengine-api-L4wtoWwoElWmstI1Ia93cg.jar&uploadType=resumable&upload_id=AEnB2Uo6HCfw6Usa3aXlcOzg0g3RawrvuAxWuOUtQxwQdxoyA0cf22LKqno0Gu-hjKGLqXIo8MF2FHR63zTxrSmQ9Yk9HdCdZQ exceeded the 10 MiB limit.
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.convertApplicationException(URLFetchServiceImpl.java:157)
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.fetch(URLFetchServiceImpl.java:45)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.fetchResponse(URLFetchServiceStreamHandler.java:543)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getInputStream(URLFetchServiceStreamHandler.java:422)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getResponseCode(URLFetchServiceStreamHandler.java:275)
at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
at java.util.concurrent.FutureTask.run(FutureTask.java:260)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:605)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1$1.run(ApiProxyImpl.java:1152)
at java.security.AccessController.doPrivileged(Native Method)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1.run(ApiProxyImpl.java:1146)
at java.lang.Thread.run(Thread.java:745)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$2$1.run(ApiProxyImpl.java:1195)

I tried directly uploading the jar file - appengine-api-1.0-sdk-1.9.20.jar, still it tries to upload this jar appengine-api-L4wtoWwoElWmstI1Ia93cg.jar. which i dont know what jar it is. Any idea on what this jar is appreciated.

Please help me to fix this issue.

回答1:

The short answer is that if you use AppEngine on a Managed VM you will not encounter the AppEngine sandbox limits (OOM when using a F1 or B1 instance class, execution time limit issues, whitelisted JRE classes). If you really want to run within the App Engine sandbox, then your use of the Dataflow SDK most conform to the limits of the AppEngine sandbox. Below I explain common issues and what people have done to conform to the AppEngine sandbox limits.

The Dataflow SDK requires an AppEngine instance class which has enough memory to execute the users application to construct the pipeline, stage any resources, and send the job description to the Dataflow service. Typically we have seen users require using an instance class with more than 128mb of memory to not see OOM errors.

Generally constructing a pipeline and submitting it to the Dataflow service typically takes less than a couple of seconds if the required resources for your application are already staged. Uploading your JARs and any other resources to GCS can take longer than 60 seconds. This can be solved manually by pre-staging your JARs to GCS beforehand (the Dataflow SDK will skip staging them again if it detects they are already there) or using a task queue to get a 10 minute limit (note that for large applications, 10 mins may not be enough to stage all your resources).

Finally, within the AppEngine sandbox environment, you and all your dependencies are limited to using only whitelisted classes within the JRE or you'll get an exception like:

java.lang.SecurityException:
  java.lang.IllegalAccessException: YYY is not allowed on ZZZ
  ...

EDIT 1

We perform a hash of the contents of the jars on the classpath and upload them to GCS with a modified filename. AppEngine runs a sandboxed environment with its own JARs, appengine-api-L4wtoWwoElWmstI1Ia93cg.jar refers to appengine-api.jar which is a jar that the sandboxed environment adds. You can see from our PackageUtil#getUniqueContentName(...) that we just append -$HASH before .jar.

We are working to solve why you are seeing the RequestPayloadToLarge excepton and it is currently recommended that you set the filesToStage option and filter out the jars not required to execute your Dataflow to get around the issue that you face. You can see how we build the files to stage with DataflowPipelineRunner#detectClassPathResourcesToStage(...).



回答2:

I had the same problem with the 10MB limit. What I did was filtering out the JAR files bigger than that limit (instead of specific files), and then set the renaming files in the DataflowPipelineOptions with setFilesToStage.

So I just copied the method detectClassPathResourcesToStage from the Dataflow SDK and changed it sightly:

private static final long FILE_BYTES_THRESHOLD = 10 * 1024 * 1024; // 10 MB

protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
    if (!(classLoader instanceof URLClassLoader)) {
        String message = String.format("Unable to use ClassLoader to detect classpath elements. "
                + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
        throw new IllegalArgumentException(message);
    }

    List<String> files = new ArrayList<>();
    for (URL url : ((URLClassLoader) classLoader).getURLs()) {
        try {
            File file = new File(url.toURI());
            if (file.length() < FILE_BYTES_THRESHOLD) {
                files.add(file.getAbsolutePath());
            }
        } catch (IllegalArgumentException | URISyntaxException e) {
            String message = String.format("Unable to convert url (%s) to file.", url);
            throw new IllegalArgumentException(message, e);
        }
    }
    return files;
}

And then when I'm creating the DataflowPipelineOptions:

DataflowPipelineOptions dataflowOptions = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
...
dataflowOptions.setFilesToStage(detectClassPathResourcesToStage(DataflowPipelineRunner.class.getClassLoader()));


回答3:

Here's a version of Helder's 10MB-filtering solution that will adapt to the default file-staging behavior of DataflowPipelineOptions even if it changes in a future version of the SDK.

Instead of duplicating the logic, it passes a throwaway copy of the DataflowPipelineOptions to DataflowPipelineRunner to see which files it would have staged, then removes any that are too big.

Note that this code assumes that you've defined a custom PipelineOptions class named MyOptions, along with a java.util.Logger field named logger.

// The largest file size that can be staged to the dataflow service.
private static final long MAX_STAGED_FILE_SIZE_BYTES = 10 * 1024 * 1024;

/**
 * Returns the list of .jar/etc files to stage based on the
 * Options, filtering out any files that are too large for
 * DataflowPipelineRunner.
 *
 * <p>If this accidentally filters out a necessary file, it should
 * be obvious when the pipeline fails with a runtime link error.
 */
private static ImmutableList<String> getFilesToStage(MyOptions options) {
  // Construct a throw-away runner with a copy of the Options to see
  // which files it would have wanted to stage. This could be an
  // explicitly-specified list of files from the MyOptions param, or
  // the default list of files determined by DataflowPipelineRunner.
  List<String> baseFiles;
  {
    DataflowPipelineOptions tmpOptions =
        options.cloneAs(DataflowPipelineOptions.class);
    // Ignore the result; we only care about how fromOptions()
    // modifies its parameter.
    DataflowPipelineRunner.fromOptions(tmpOptions);
    baseFiles = tmpOptions.getFilesToStage();
    // Some value should have been set.
    Preconditions.checkNotNull(baseFiles);
  }
  // Filter out any files that are too large to stage.
  ImmutableList.Builder<String> filteredFiles = ImmutableList.builder();
  for (String file : baseFiles) {
    long size = new File(file).length();
    if (size < MAX_STAGED_FILE_SIZE_BYTES) {
      filteredFiles.add(file);
    } else {
      logger.info("Not staging large file " + file + ": length " + size
          + " >= max length " + MAX_STAGED_FILE_SIZE_BYTES);
    }
  }
  return filteredFiles.build();
}

/** Runs the processing pipeline with given options. */
public void runPipeline(MyOptions options)
    throws IOException, InterruptedException {
  // DataflowPipelineRunner can't stage large files;
  // remove any from the list.
  DataflowPipelineOptions dpOpts =
      options.as(DataflowPipelineOptions.class);
  dpOpts.setFilesToStage(getFilesToStage(options));

  // Run the pipeline as usual using "options".
  // ...
}