Dataflow: Using Top module with Python SDK: single

2019-07-17 15:43发布

问题:

I was looking at the word_counting.py example on the incubator-beam repository (linked from the Dataflow documentation), and I want to modifiy it to get the n with the most occurrences. Here is my pipeline:

  counts = (lines
        | 'split' >> (beam.ParDo(WordExtractingDoFn())
                      .with_output_types(unicode))
        | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
        | 'group' >> beam.GroupByKey()
        | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
        | 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c) # 'top' is the only added line

  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))

I added a line using the Top.Of() method, but it seems it returns a PCollection with an array as single element (I was waiting an ordered PCollection but looking at the doc it seems that PCollections are unordered collections.

When the pipeline runs, beam.Map loop over only one element (which is the entire array) and in 'format', the lambda function raise an error, since it cannot map the entire array into the tuple (word,c)

How should I handle this single-element PCollection without interrupting the pipeline at this step ?

回答1:

If you want to expand a PCollection of iterables into a PCollection of the elements of these iterables, you can use FlatMap, whose argument is a function from elements to an iterable of results: in your case, the elements are iterables themselves, so we use the identity function.

  counts = ...
        | 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c)
        | 'expand' >> beam.FlatMap(lambda word_counts: word_counts) # sic!

  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
  ...