How to use google provided template [pubsub to Dat

2019-08-23 05:08发布

I want to use this google provided template which stream data from pubsub to datastore. https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/PubsubToDatastore.java

I follow the step wrote this document. https://github.com/GoogleCloudPlatform/DataflowTemplates

I pass this step.

mvn clean && mvn compile

But next step, the error occured.

    [INFO] --- exec-maven-plugin:1.6.0:java (default-cli) @ google-cloud-teleport-java ---
    2018-08-17 13:36:19 INFO  DataflowRunner:266 - PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 117 files. Enable logging at DEBUG level to see which files wi
    ll be staged.
    [WARNING]
    java.lang.IllegalStateException: Missing required properties: errorTag
            at com.google.cloud.teleport.templates.common.AutoValue_DatastoreConverters_WriteJsonEntities$Builder.build(AutoValue_DatastoreConverters_WriteJsonEntities.java:89)
            at com.google.cloud.teleport.templates.PubsubToDatastore.main(PubsubToDatastore.java:65)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
            at java.lang.Thread.run(Thread.java:748)
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD FAILURE
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 35.348 s
    [INFO] Finished at: 2018-08-17T13:36:20+09:00
    [INFO] Final Memory: 59M/146M
    [INFO] ------------------------------------------------------------------------
    [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project google-cloud-teleport-java: An exception occured while executing the Java class. Missing required propert
    ies: errorTag -> [Help 1]
    [ERROR]
    [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
    [ERROR] Re-run Maven using the -X switch to enable full debug logging.
    [ERROR]
    [ERROR] For more information about the errors and possible solutions, please read the following articles:
    [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException

Then, I tried DatastoreToPubsub template and GSCTextToDatastore template, and these were successful.

https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/DatastoreToPubsub.java

https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/TextToDatastore.java

So, I can't understand what is problem. Where is wrong?

Please give me some advices...
Regards.

1条回答
别忘想泡老子
2楼-- · 2019-08-23 05:20

Looks like you found a bug in that particular DataflowTemplate whereby the pipeline is not configuring an error path even though a path is required when writing JSON entities. The fix is relatively simple and should be pushed to master shortly. In the meantime, you can get the pipeline working with two changes to the PubsubToDatastore pipeline code.

First modify the code so the PubsubToDatastoreOptions extends the ErrorWriteOptions interface. Your new options declaration should look something similar to the following:

interface PubsubToDatastoreOptions
  extends
  PipelineOptions,
  PubsubReadOptions,
  JavascriptTextTransformerOptions,
  DatastoreWriteOptions,
  ErrorWriteOptions {}

Then modify the code within the main method so that the pipeline configures an error TupleTag and routes any error messages to the LogErrors transform. This will ensure any data which fails to be output to Datastore is captured and stored on GCS. Your new main method should look something similar to the following:

TupleTag<String> errorTag = new TupleTag<String>(){};

Pipeline pipeline = Pipeline.create(options);

pipeline
    .apply(PubsubIO.readStrings()
        .fromTopic(options.getPubsubReadTopic()))
    .apply(TransformTextViaJavascript.newBuilder()
        .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
        .setFunctionName(options.getJavascriptTextTransformFunctionName())
        .build())
    .apply(WriteJsonEntities.newBuilder()
        .setProjectId(options.getDatastoreWriteProjectId())
        .setErrorTag(errorTag)
        .build())
    .apply(LogErrors.newBuilder()
        .setErrorWritePath(options.getErrorWritePath())
        .setErrorTag(errorTag)
        .build());
查看更多
登录 后发表回答