Apache Beam Go SDK with Dataflow

2019-08-17 22:13发布

问题:

I've been working with the Go Beam SDK (v2.13.0) and can't get the wordcount example working on GCP Dataflow. It enters crash loop trying to start the org.apache.beam.runners.dataflow.worker.DataflowRunnerHarness. The example is executing correctly when run locally using the Direct runner.

The example was completely unmodified from the original example given above.

The stack trace is:

org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.InvalidProtocolBufferException: Protocol message had invalid UTF-8. 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.InvalidProtocolBufferException.invalidUtf8(InvalidProtocolBufferException.java:148) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.CodedInputStream$StreamDecoder.readStringRequireUtf8(CodedInputStream.java:2353) 
at org.apache.beam.model.pipeline.v1.RunnerApi$FunctionSpec.<init>(RunnerApi.java:59611) 
at org.apache.beam.model.pipeline.v1.RunnerApi$FunctionSpec.<init>(RunnerApi.java:59572) 
at org.apache.beam.model.pipeline.v1.RunnerApi$FunctionSpec$1.parsePartialFrom(RunnerApi.java:60241) 
at org.apache.beam.model.pipeline.v1.RunnerApi$FunctionSpec$1.parsePartialFrom(RunnerApi.java:60235) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.CodedInputStream$StreamDecoder.readMessage(CodedInputStream.java:2424) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Coder.<init>(RunnerApi.java:27531) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Coder.<init>(RunnerApi.java:27489) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Coder$1.parsePartialFrom(RunnerApi.java:28410) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Coder$1.parsePartialFrom(RunnerApi.java:28404) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Coder$Builder.mergeFrom(RunnerApi.java:28028) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Coder$Builder.mergeFrom(RunnerApi.java:27868) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.CodedInputStream$StreamDecoder.readMessage(CodedInputStream.java:2408) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.MapEntryLite.parseField(MapEntryLite.java:128) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.MapEntryLite.parseEntry(MapEntryLite.java:184) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.MapEntry.<init>(MapEntry.java:106) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.MapEntry.<init>(MapEntry.java:50) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.MapEntry$Metadata$1.parsePartialFrom(MapEntry.java:70) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.MapEntry$Metadata$1.parsePartialFrom(MapEntry.java:64) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.CodedInputStream$StreamDecoder.readMessage(CodedInputStream.java:2424) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Components.<init>(RunnerApi.java:930) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Components.<init>(RunnerApi.java:848) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Components$1.parsePartialFrom(RunnerApi.java:2714) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Components$1.parsePartialFrom(RunnerApi.java:2708) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.CodedInputStream$StreamDecoder.readMessage(CodedInputStream.java:2424) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Pipeline.<init>(RunnerApi.java:2892) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Pipeline.<init>(RunnerApi.java:2850) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Pipeline$1.parsePartialFrom(RunnerApi.java:3981) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Pipeline$1.parsePartialFrom(RunnerApi.java:3975) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:221) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:239) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:244) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) 
at org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.GeneratedMessageV3.parseWithIOException(GeneratedMessageV3.java:311) 
at org.apache.beam.model.pipeline.v1.RunnerApi$Pipeline.parseFrom(RunnerApi.java:3222) 
at org.apache.beam.runners.dataflow.worker.DataflowWorkerHarnessHelper.getPipelineFromEnv(DataflowWorkerHarnessHelper.java:131) 
at org.apache.beam.runners.dataflow.worker.DataflowRunnerHarness.main(DataflowRunnerHarness.java:59) 

I was using the docker image specified in the example and also tried from my own docker using the same tag (v2.13.0) but still get the same error. I Realize it's not production ready, but I am hoping the samples should work.

As per the instructions on the getting starting I ran the job like this:

wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://example-bucket/counts \
--runner dataflow \
--project example-project \
--temp_location gs://example-bucket/tmp/ \
--staging_location gs://example-bucket/binaries/ \
--worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515

Again I tried that docker provided in the getting started, as well as one built using v2.13.0.

My go.mod for the sample file is:

module example.org/wordcount

go 1.12

require (
    cloud.google.com/go v0.41.0 // indirect
    github.com/apache/beam v2.13.0+incompatible
    github.com/pkg/errors v0.8.1 // indirect
    golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect
    google.golang.org/grpc v1.22.0 // indirect
)

What could be causing this?

回答1:

Dataflow doesn't officially support the Apache Beam Go SDK. Some users have had been able to use it though. I suspect that this release may have had issues. You may be able to try a different version.

You may be able to discuss with other users on the Beam mailing list about which versions have worked for them (though, unsupported).