I was looking at the word_counting.py example on the incubator-beam repository (linked from the Dataflow documentation), and I want to modifiy it to get the n with the most occurrences. Here is my pipeline:
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
| 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c) # 'top' is the only added line
output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
I added a line using the Top.Of() method, but it seems it returns a PCollection with an array as single element (I was waiting an ordered PCollection but looking at the doc it seems that PCollections are unordered collections.
When the pipeline runs, beam.Map loop over only one element (which is the entire array) and in 'format', the lambda function raise an error, since it cannot map the entire array into the tuple (word,c)
How should I handle this single-element PCollection without interrupting the pipeline at this step ?
If you want to expand a
PCollection
of iterables into aPCollection
of the elements of these iterables, you can useFlatMap
, whose argument is a function from elements to an iterable of results: in your case, the elements are iterables themselves, so we use the identity function.