Using Cloud Dataflow 'side input' works lo

2019-07-07 05:02发布

We've defined 2 BigQuery side inputs for our ParDo, as specified by the docs. When the pipeline is executed locally (i.e. DirectPipelineRunner) the side inputs work fine. However, when it's executed in the cloud it borks with:

java.lang.IllegalArgumentException: calling sideInput() with unknown view; did you forget to pass the view in ParDo.withSideInputs()?

Why would it work locally, but not when executing via the cloud?

Connected to the target VM, address: '127.0.0.1:61484', transport: 'socket'
Mar 05, 2015 4:58:26 PM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 75 files. Enable logging at DEBUG level to see which files will be staged.
Mar 05, 2015 4:58:27 PM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
INFO: Executing pipeline on the Dataflow Service, which will have billing implications related to Google Compute Engine usage and other Google Cloud Services.
Mar 05, 2015 4:58:27 PM com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElementsToGcs
INFO: Uploading 75 files from PipelineOptions.filesToStage to GCS to prepare for execution in the cloud.
Mar 05, 2015 4:59:20 PM com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElementsToGcs
INFO: Uploading PipelineOptions.filesToStage complete: 4 files newly uploaded, 71 files cached
Dataflow SDK version: 0.3.150227
Mar 05, 2015 4:59:30 PM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
Submitted job: 2015-03-04_21_59_29-8181498263178343117
INFO: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/<removed>/dataflow/job/2015-03-04_21_59_29-8181498263178343117
2015-03-05T06:00:06.431Z: (470854a2051eb53e): Expanding GroupByKey operations into optimizable parts.
2015-03-05T06:00:06.434Z: (470854a2051ebec0): Annotating graph with Autotuner information.
2015-03-05T06:00:38.227Z: (470854a2051ebf4a): Fusing adjacent ParDo, Read, Write, and Flatten operations
2015-03-05T06:00:38.230Z: (470854a2051eb8cc): Fusing consumer Impressions-CPT-transformation into Impressions-GCS-read
2015-03-05T06:00:38.233Z: (470854a2051eb24e): Fusing consumer Impressions-GSC-write into Impressions-CPT-transformation
2015-03-05T06:00:38.236Z: (470854a2051ebbd0): Fusing consumer ActiveViews-CPT-transformation into ActiveViews-GCS-read
2015-03-05T06:00:38.239Z: (470854a2051eb552): Fusing consumer ActiveViews-GSC-write into ActiveViews-CPT-transformation
2015-03-05T06:00:38.241Z: (470854a2051ebed4): Fusing consumer Clicks-CPT-transformation into Clicks-GCS-read
2015-03-05T06:00:38.243Z: (470854a2051eb856): Fusing consumer Clicks-GSC-write into Clicks-CPT-transformation
2015-03-05T06:00:38.291Z: (470854a2051ebcfc): Adding StepResource setup and teardown to workflow graph.
2015-03-05T06:00:38.298Z: (470854a2051eb000): Not adding lease related steps.
2015-03-05T06:00:38.311Z: (470854a2051eb982): Starting the input generators.
2015-03-05T06:00:38.332Z: (91191d48cd0d620e): Adding workflow start and stop steps.
2015-03-05T06:00:38.335Z: (91191d48cd0d6d50): Assigning stage ids.
2015-03-05T06:00:38.796Z: S11: (c3f8963c90f0500f): Executing operation Lineitems2
2015-03-05T06:00:38.839Z: S06: (6090a361d1acd580): Executing operation Advertisers3
2015-03-05T06:00:38.839Z: (b3da167931dd7955): Value "Lineitems2.out" materialized.
2015-03-05T06:00:38.840Z: S01: (d248458030712ffd): Executing operation Advertisers
2015-03-05T06:00:38.861Z: S03: (470854a2051ebfc5): Executing operation Lineitems
2015-03-05T06:00:38.869Z: S08: (84f12b285dcf746b): Executing operation Lineitems3
2015-03-05T06:00:38.874Z: S13: (fd7e694921bb42e4): Executing operation Advertisers2
2015-03-05T06:00:38.876Z: S12: (902bfbb49c2e797d): Executing operation AsIterable4/CreatePCollectionView
2015-03-05T06:00:38.880Z: (19dca2539403f653): Value "Advertisers.out" materialized.
2015-03-05T06:00:38.881Z: (76efcecc96a971f3): Value "Advertisers3.out" materialized.
2015-03-05T06:00:38.905Z: (84f12b285dcf736c): Starting worker pool setup.
2015-03-05T06:00:38.906Z: (19dca2539403f3d2): Value "Lineitems.out" materialized.
2015-03-05T06:00:38.912Z: (84f12b285dcf72a0): Starting 50 workers...
2015-03-05T06:00:38.915Z: S07: (91191d48cd0d6b3b): Executing operation AsIterable5/CreatePCollectionView
2015-03-05T06:00:38.920Z: (fd7e694921bb4b90): Value "Advertisers2.out" materialized.
2015-03-05T06:00:38.926Z: (221dd8de96cdfdbc): Value "Lineitems3.out" materialized.
2015-03-05T06:00:38.929Z: (6e68655249248499): Value "AsIterable4/CreatePCollectionView.out" materialized.
2015-03-05T06:00:38.933Z: S02: (90c9f0b8bf17cb53): Executing operation AsIterable/CreatePCollectionView
2015-03-05T06:00:38.938Z: S04: (b80a39fe501a7928): Executing operation AsIterable2/CreatePCollectionView
2015-03-05T06:00:38.949Z: S14: (221dd8de96cdf7a8): Executing operation AsIterable3/CreatePCollectionView
2015-03-05T06:00:38.954Z: (91191d48cd0d61bf): Value "AsIterable5/CreatePCollectionView.out" materialized.
2015-03-05T06:00:38.959Z: S09: (7001cd4292313be4): Executing operation AsIterable6/CreatePCollectionView
2015-03-05T06:00:38.966Z: (84f12b285dcf716e): Value "AsIterable/CreatePCollectionView.out" materialized.
2015-03-05T06:00:38.974Z: (76efcecc96a9777f): Value "AsIterable2/CreatePCollectionView.out" materialized.
2015-03-05T06:00:38.989Z: (84f12b285dcf70d5): Value "AsIterable3/CreatePCollectionView.out" materialized.
2015-03-05T06:00:38.993Z: (90c9f0b8bf17ca82): Value "AsIterable6/CreatePCollectionView.out" materialized.
2015-03-05T06:00:39.006Z: S05: (76efcecc96a97d0b): Executing operation Impressions-GCS-read+Impressions-CPT-transformation+Impressions-GSC-write
2015-03-05T06:00:39.018Z: S10: (fd7e694921bb4092): Executing operation ActiveViews-GCS-read+ActiveViews-CPT-transformation+ActiveViews-GSC-write
2015-03-05T06:00:39.018Z: S15: (84f12b285dcf7009): Executing operation Clicks-GCS-read+Clicks-CPT-transformation+Clicks-GSC-write
2015-03-05T06:03:02.735Z: (1f5324f21f91a6ba): java.lang.IllegalArgumentException: calling sideInput() with unknown view; did you forget to pass the view in ParDo.withSideInputs()?
    at com.google.cloud.dataflow.sdk.util.DoFnContext.sideInput(DoFnContext.java:107)
    at com.google.cloud.dataflow.sdk.util.DoFnProcessContext.sideInput(DoFnProcessContext.java:81)
    at com.telstra.cpt.engine.DFPDoFn.processElement(DFPDoFn.java:54)

EDITED UPDATE 1

We've managed to solve the IllegalArgumentException by creating a new ParDo instance for each step of the transform. However, now we get a class cast exception when trying to process the side-input (once again, this runs perfectly fine when executed locally):

2015-03-16T08:12:29.107Z: (480fb99418f6923b): java.lang.ClassCastException: com.google.api.services.bigquery.model.TableRow cannot be cast to com.google.cloud.dataflow.sdk.util.WindowedValue
    at com.google.cloud.dataflow.sdk.transforms.View$IterablePCollectionView$1.apply(View.java:192)
    at com.google.common.collect.Iterators$8.transform(Iterators.java:799)
    at com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48)
    at com.telstra.cdf.dfp.DFPDoFn.buildAdvertisers(DFPDoFn.java:129)
    at com.telstra.cdf.dfp.DFPDoFn.buildSideInputs(DFPDoFn.java:80)
    at com.telstra.cdf.dfp.DFPDoFn.processElement(DFPDoFn.java:50)

Job ID: 2015-03-16_01_10_11-6267129041459219709

EDITED UPDATE 2

Tried building against the latest source in Github, because of the delay rolling it out to Maven. Seems to be a regression bug now. When querying the ProcessContext for the side-input, it returns null.

However, just as before, it runs perfectly locally.

2条回答
迷人小祖宗
2楼-- · 2019-07-07 05:36

One likely cause of this kind of problem is using a DoFn multiple times and mutating its fields in between usages. For instance:

MyDoFn doFn = new MyDoFn();

final PCollectionView<Iterable<String>> v1 = createView(...);
doFn.view = v1;
p.apply(ParDo.of(doFn).withSideInputs(v1));

final PCollectionView<Iterable<String>> v2 = createView(...);
doFn.view = v2;
p.apply(ParDo.of(doFn).withSideInputs(v2));

The above should use two instances of MyDoFn. Consider using a constructor and final fields to make this easier:

final PCollectionView<Iterable<String>> v1 = createView(...);
MyDoFn doFn1 = new MyDoFn(v1);
p.apply(ParDo.of(doFn).withSideInputs(v1));

final PCollectionView<Iterable<String>> v2 = createView(...);
MyDoFn doFn2 = new MyDoFn(v2);
p.apply(ParDo.of(doFn).withSideInputs(v2));
查看更多
啃猪蹄的小仙女
3楼-- · 2019-07-07 05:58

This is now fixed with the latest version of the SDK, which was rolled out last week: 0.3.150326

查看更多
登录 后发表回答