I have an Apache Beam pipeline built in python. I am reading rows from a csv file.Then there are generic pipeline steps for All pcollections. This works fine. For pcollections which come from a specific filename, I want to perform couple of additional steps. Therefore I tag the pcollections in that file and run additional steps for those tagged collections. Wehn I run the pipeline on 'Dataflow' it gives me the error "Workflow failed. Causes: Expected custom source to have non-zero number of splits. "
I tested and this works fine on 'DirectRunner'.
lines = (p | beam.io.ReadFromText(input_file_path, skip_header_lines=1))
Generic = (lines | <"Do generic logic for all pCollections">)
tagged_lines = (lines | beam.ParDo(Tag(),input_file_path).with_outputs(Tag.TAG_OPTOUT,Tag.TAG_BOUNCE))
Optouts = (tagged_lines[Tag.TAG_OPTOUT] | <"Do logic 1">)
Bounces = (tagged_lines[Tag.TAG_BOUNCE] | <"Do logic 2">)
class Tag(beam.DoFn):
TAG_OPTOUT = 'OPTOUT'
TAG_BOUNCE = 'BOUNCE'
def process(self, element,input_file_path):
input_file = input_file_path.get()
if "optout" in input_file:
yield pvalue.TaggedOutput(self.TAG_OPTOUT, element)
elif "bounce" in input_file:
yield pvalue.TaggedOutput(self.TAG_BOUNCE, element)