Apache Beam - skip pipeline step

2019-08-27 10:43发布

问题:

I'm using Apache Beam to set up a pipeline consisting of 2 main steps:

  • transform the data using a Beam Transform
  • load the transformed data to BigQuery

The pipeline setup looks like this:

myPCollection = (org.apache.beam.sdk.values.PCollection<myCollectionObjectType>)myInputPCollection
                .apply("do a parallel transform"),
                     ParDo.of(new MyTransformClassName.MyTransformFn()));

 myPCollection
    .apply("Load BigQuery data for PCollection",
            BigQueryIO.<myCollectionObjectType>write()
            .to(new MyDataLoadClass.MyFactTableDestination(myDestination))
            .withFormatFunction(new MyDataLoadClass.MySerializationFn())

I've looked at this question:

Apache Beam: Skipping steps in an already-constructed pipeline

which suggests that I may be able to somehow dynamically change which output I can pass data to, following the parallel transform in step 1.

How do I do this? I don't know how to choose whether or not to pass myPCollection from step 1 to step 2. I need to skip step 2 if the object in myPCollection from step 1 is null.

回答1:

You just don't emit the element from your MyTransformClassName.MyTransformFn when you don't want it in the next step, for example something like this:

class MyTransformClassName.MyTransformFn extends...
  @ProcessElement
  public void processElement(ProcessContext c, ...) {
    ...
    result = ...
    if (result != null) {
       c.output(result);   //only output something that's not null
    }
  }

This way nulls don't reach the next step.

See the ParDo section of the guide for more details: https://beam.apache.org/documentation/programming-guide/#pardo