I'd like to pass the filename of the file uploaded on Google Cloud Storage from Cloud Functions to Dataflow so that I can process the uploaded file.
The code that I've written for Cloud Functions is
const google = require('googleapis');
exports.goWithTheDataFlow = function(event, callback) {
const file = event.data;
if (file.resourceState === 'exists' && file.name) {
console.log(file.name);
google.auth.getApplicationDefault(function (err, authClient, projectId) {
if (err) {
throw err;
}
if (authClient.createScopedRequired && authClient.createScopedRequired()) {
authClient = authClient.createScoped([
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/userinfo.email'
]);
}
const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });
dataflow.projects.templates.create({
projectId: '-------',
resource: {
parameters: {
inputFile: `gs://${file.bucket}/${file.name}`
},
jobName: '-------',
gcsPath: '-------'
}
}, function(err, response) {
if (err) {
console.error("problem running dataflow template, error was: ", err);
}
console.log("Dataflow template response: ", response);
callback();
});
});
}
};
And the code of my Pipeline looks something like this:
public interface FruitOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Validation.Required
ValueProvider<String> getInputFile();
void setInputFile(ValueProvider<String> value);
}
//Main Method
FruitOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(FruitOptions.class);
Pipeline p = Pipeline.create(options);
How to create a template of the above Pipeline code? I've been following the link TRIGGERING DATAFLOW PIPELINES WITH CLOUD FUNCTIONS to do this but I get several errors while running the maven commands such as:
[WARNING]
java.lang.NoClassDefFoundError: com/google/cloud/dataflow/sdk/options/PipelineOp
tions
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
at java.lang.Class.getMethod0(Class.java:3018)
at java.lang.Class.getMethod(Class.java:1784)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:281)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.google.cloud.dataflow.sdk.optio
ns.PipelineOptions
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 7 more
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 12.411 s
[INFO] Finished at: 2017-06-29T12:16:15+05:30
[INFO] Final Memory: 11M/27M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.4.0:java (d
efault-cli) on project Common: An exception occured while executing the Java cla
ss. com/google/cloud/dataflow/sdk/options/PipelineOptions: com.google.cloud.data
flow.sdk.options.PipelineOptions -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e swit
ch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please rea
d the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionE
xception
And the command that I ran was:
mvn compile exec:java -Dexec.mainClass=Common.StarterPipeline -Dexec.args="--project=******** --stagingLocation=gs://******** --dataflowJobFile=gs://********* --runner=TemplatingDataflowPipelineRunner"
The POM file:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Common</groupId>
<artifactId>Common</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<repositories>
<repository>
<id>ossrh.snapshots</id>
<name>Sonatype OSS Repository Hosting</name>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.4.0</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<dependencies>
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>2.0.0</version>
</dependency>
<!-- slf4j API frontend binding with JUL backend -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.14</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>1.7.14</version>
</dependency>
</dependencies>
</project>
Can somebody please help me with this. And please let me know if I'm doing anything wrong. Thanks