Check if PCollection is empty - Apache Beam

2019-06-12 03:39发布

问题:

Is there any way to check if a PCollection is empty?

I haven't found anything relevant in the documentation of Dataflow and Apache Beam.

回答1:

There is no way to check size of the PCollection without applying a PTransform on it (such as Count.globally() or Combine.combineFn()) because PCollection is not like a typical Collection in Java SDK or so.

It is an abstraction of bounded or unbounded collection of data where data is fed into the collection for an operation being applied on it (e.g. PTransform). Also it is parallelized (as the P at the beginning of the class suggest).

Therefore you need a mechanism to get counts of elements from each worker/node and combine them to get a value. Whether it is 0 or n can not be known until the end of that transformation.



回答2:

You didn't specify which SDK you're using, so I assumed Python. The code is easily portable to Java.

You can apply global counting of elements and then map numeric value to boolean by applying simple comparison. You will be able to side-input this value using pvalue.AsSingleton function, like this:

import apache_beam as beam
from apache_beam import pvalue

is_empty_check = (your_pcollection
                    | "Count" >> beam.combiners.Count.Globally()
                    | "Is empty?" >> beam.Map(lambda n: n == 0)
                    )

another_pipeline_branch = (
    p
    | beam.Map(do_something, is_empty=pvalue.AsSingleton(is_empty_check))
)

Usage of the side input is the following:

def do_something(element, is_empty):
    if is_empty:
        # yes
    else:
        # no