How To Filter None Values Out Of PCollection

2019-08-27 11:20发布

问题:

My pubsub pull subscription is sending over the message and a None value for each message. I need to find a way to filter out the none values as part of my pipeline processing

Of course some help preventing the none values from arriving from the pull subscription would be nice. But I feel like I'm missing something about the general workflow of defining & applying functions via ParDo.

I've set up a function to filter out none values which seems to work based on a print to console check, however when applying a lambda function that crashes on none types I still receive errors.

I've found the documentation on the python Apache Beam SDK a little sparse but I have been looking all through there for answers without much luck.

from __future__ import absolute_import

import argparse
import logging

from past.builtins import unicode

import apache_beam as beam
import apache_beam.transforms.window as window

from apache_beam.examples.wordcount import WordExtractingDoFn
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions



def print_row(row):
    print row
    print type(row)

def filter_out_nones(row):
  if row is not None:
    yield row
  else:
    print 'we found a none! get it out'


def run(argv=None):
    pipeline_options = PipelineOptions()
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(StandardOptions).streaming = True


    p = beam.Pipeline(options=pipeline_options)



    data = ['test1 message','test2 message',None,'test3 please work']

## this does seem to return only the values I would hope for based on the console log

    testlogOnly = (p | "makeData" >> beam.Create(data)
               | "filter" >> beam.ParDo(filter_out_nones)
               | "printtesting" >> beam.Map(print_row))
            #  | 'encoding' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
            #  | "writing" >> beam.io.WriteToPubSub("projects/??/topics/??"))


##    testlogAndWrite = (p | "MakeWriteData" >> beam.Create(data)
            #  | "filterHere" >> beam.ParDo(filter_out_nones)
            #   | "printHere" >> beam.Map(print_row)
## below here does not work due to the following message
## AttributeError: 'NoneType' object has no attribute 'encode' [while running 'encodeHere']
            #   | 'encodeHere' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
            # | "writeTest" >> beam.io.WriteToPubSub("projects/??/topics/??"))

    result = p.run()
    result.wait_until_finish()


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

If I could log the byte string encoded messages without the none results I'll be where I need to be.

回答1:

Your approach to filter out None values looks good to me.

However, if I understand it correctly, when you are using testlogAndWrite and get the AttributeError you are keeping the "printHere" >> beam.Map(print_row) step in the pipeline.

print_row reads the messages and prints them but it does not output anything. Therefore, there will be no input for next step encode_here.

To solve this you can comment out that step or make sure that each element is returned:

def print_row(row):
    print row
    print type(row)
    return row

Output:

test1 message
<type 'str'>
test2 message
<type 'str'>
we found a none! get it out
test3 please work
<type 'str'>