Dataflow: Using Top module with Python SDK: single

2019-07-17 15:20发布

I was looking at the word_counting.py example on the incubator-beam repository (linked from the Dataflow documentation), and I want to modifiy it to get the n with the most occurrences. Here is my pipeline:

  counts = (lines
        | 'split' >> (beam.ParDo(WordExtractingDoFn())
                      .with_output_types(unicode))
        | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
        | 'group' >> beam.GroupByKey()
        | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
        | 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c) # 'top' is the only added line

  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))

I added a line using the Top.Of() method, but it seems it returns a PCollection with an array as single element (I was waiting an ordered PCollection but looking at the doc it seems that PCollections are unordered collections.

When the pipeline runs, beam.Map loop over only one element (which is the entire array) and in 'format', the lambda function raise an error, since it cannot map the entire array into the tuple (word,c)

How should I handle this single-element PCollection without interrupting the pipeline at this step ?

1条回答
姐就是有狂的资本
2楼-- · 2019-07-17 16:07

If you want to expand a PCollection of iterables into a PCollection of the elements of these iterables, you can use FlatMap, whose argument is a function from elements to an iterable of results: in your case, the elements are iterables themselves, so we use the identity function.

  counts = ...
        | 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c)
        | 'expand' >> beam.FlatMap(lambda word_counts: word_counts) # sic!

  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
  ...
查看更多
登录 后发表回答