Pass parameters from Cloud functions to Dataflow

2019-03-01 03:25发布

问题:

I'd like to pass the filename of the file uploaded on Google Cloud Storage from Cloud Functions to Dataflow so that I can process the uploaded file.

The code that I've written for Cloud Functions is

const google = require('googleapis');

exports.goWithTheDataFlow = function(event, callback) {
 const file = event.data;
 if (file.resourceState === 'exists' && file.name) {
     console.log(file.name);
   google.auth.getApplicationDefault(function (err, authClient, projectId) {
     if (err) {
       throw err;
     }

     if (authClient.createScopedRequired && authClient.createScopedRequired()) {
       authClient = authClient.createScoped([
         'https://www.googleapis.com/auth/cloud-platform',
         'https://www.googleapis.com/auth/userinfo.email'
       ]);
     }

     const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });

     dataflow.projects.templates.create({
       projectId: '-------',
       resource: {
           parameters: {
           inputFile: `gs://${file.bucket}/${file.name}`
         },
         jobName: '-------',
         gcsPath: '-------'
       }
     }, function(err, response) {
       if (err) {
         console.error("problem running dataflow template, error was: ", err);
       }
       console.log("Dataflow template response: ", response);
       callback();
     });

   });
 }
};

And the code of my Pipeline looks something like this:

public interface FruitOptions extends PipelineOptions {
      @Description("Path of the file to read from")
      @Validation.Required
      ValueProvider<String> getInputFile();
      void setInputFile(ValueProvider<String> value);
  }


//Main Method
FruitOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
      .as(FruitOptions.class);
      Pipeline p = Pipeline.create(options);

How to create a template of the above Pipeline code? I've been following the link TRIGGERING DATAFLOW PIPELINES WITH CLOUD FUNCTIONS to do this but I get several errors while running the maven commands such as:

[WARNING]
java.lang.NoClassDefFoundError: com/google/cloud/dataflow/sdk/options/PipelineOp
tions
        at java.lang.Class.getDeclaredMethods0(Native Method)
        at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
        at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
        at java.lang.Class.getMethod0(Class.java:3018)
        at java.lang.Class.getMethod(Class.java:1784)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:281)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.google.cloud.dataflow.sdk.optio
ns.PipelineOptions
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 7 more
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 12.411 s
[INFO] Finished at: 2017-06-29T12:16:15+05:30
[INFO] Final Memory: 11M/27M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.4.0:java (d
efault-cli) on project Common: An exception occured while executing the Java cla
ss. com/google/cloud/dataflow/sdk/options/PipelineOptions: com.google.cloud.data
flow.sdk.options.PipelineOptions -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e swit
ch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please rea
d the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionE
xception

And the command that I ran was:

mvn compile exec:java -Dexec.mainClass=Common.StarterPipeline -Dexec.args="--project=******** --stagingLocation=gs://******** --dataflowJobFile=gs://********* --runner=TemplatingDataflowPipelineRunner"

The POM file:

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>Common</groupId>
  <artifactId>Common</artifactId>
  <version>0.0.1-SNAPSHOT</version>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <repositories>
    <repository>
      <id>ossrh.snapshots</id>
      <name>Sonatype OSS Repository Hosting</name>
      <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
      <releases>
        <enabled>false</enabled>
      </releases>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
  </repositories>

  <build>
   <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.5.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>

    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>org.codehaus.mojo</groupId>
          <artifactId>exec-maven-plugin</artifactId>
          <version>1.4.0</version>
          <configuration>
            <cleanupDaemonThreads>false</cleanupDaemonThreads>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

  <dependencies>
    <dependency>
      <groupId>com.google.cloud.dataflow</groupId>
      <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
      <version>2.0.0</version>
    </dependency>

    <!-- slf4j API frontend binding with JUL backend -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.14</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-jdk14</artifactId>
      <version>1.7.14</version>
    </dependency>
  </dependencies>
</project>

Can somebody please help me with this. And please let me know if I'm doing anything wrong. Thanks