Error while splitting pcollections on Dataflow run

2019-08-12 03:52发布

I have an Apache Beam pipeline built in python. I am reading rows from a csv file.Then there are generic pipeline steps for All pcollections. This works fine. For pcollections which come from a specific filename, I want to perform couple of additional steps. Therefore I tag the pcollections in that file and run additional steps for those tagged collections. Wehn I run the pipeline on 'Dataflow' it gives me the error "Workflow failed. Causes: Expected custom source to have non-zero number of splits. "

I tested and this works fine on 'DirectRunner'.

lines = (p | beam.io.ReadFromText(input_file_path, skip_header_lines=1))

Generic = (lines | <"Do generic logic for all pCollections">)

tagged_lines = (lines | beam.ParDo(Tag(),input_file_path).with_outputs(Tag.TAG_OPTOUT,Tag.TAG_BOUNCE))

Optouts = (tagged_lines[Tag.TAG_OPTOUT] | <"Do logic 1">)

Bounces = (tagged_lines[Tag.TAG_BOUNCE] | <"Do logic 2">)

class Tag(beam.DoFn):
    TAG_OPTOUT = 'OPTOUT'
    TAG_BOUNCE = 'BOUNCE'
    def process(self, element,input_file_path):
        input_file = input_file_path.get()
        if "optout" in input_file:
            yield pvalue.TaggedOutput(self.TAG_OPTOUT, element)
        elif "bounce" in input_file:
        yield pvalue.TaggedOutput(self.TAG_BOUNCE, element)

0条回答
登录 后发表回答