How to create groups of N elements from a PCollect

2020-05-29 03:04发布

问题:

I am trying to accomplish something like this: Batch PCollection in Beam/Dataflow

The answer in the above link is in Java, whereas the language I'm working with is Python. Thus, I require some help getting a similar construction.

Specifically I have this:

 p = beam.Pipeline (options = pipeline_options)
 lines = p | 'File reading' >> ReadFromText (known_args.input)

After this, I need to create another PCollection but with a List of N rows of "lines" since my use case requires a group of rows. I can not operate line by line.

I tried a ParDo Function using variables for count associating with the counter N rows and after groupBy using Map. But these are reset every 1000 records, so it's not the solution I am looking for. I read the example in the link but I do not know how to do something like that in Python.

I tried saving the counters in Datastore, however, the speed difference between Dataflow reading and writing with Datastore is quite significant.

What is the correct way to do this? I don't know how else to approach it. Regards.

回答1:

Assume the grouping order is not important, you can just group inside a DoFn.

class Group(beam.DoFn):
  def __init__(self, n):
     self._n = n
     self._buffer = []

  def process(self, element):
     self._buffer.append(element)
     if len(self._buffer) == self._n:
        yield list(self._buffer)
        self._buffer = []

  def finish_bundle(self):
     if len(self._buffer) != 0:
        yield list(self._buffer)
        self._buffer = []

lines = p | 'File reading' >> ReadFromText(known_args.input)
          | 'Group' >> beam.ParDo(Group(known_args.N)
          ...