How to output all values from a pair, groupe

2019-09-18 18:55发布

问题:

I'm trying to do something that seems relatively straightforward but am running into some difficulty.

I have a bunch of text, and each line is a value. I analyze each line of text, create the appropriate key, then emit KV pairs. I then use the GroupByKey transform. Finally, I want to output all the text now grouped by key (bonus points if I can get one text file for each key, but I'm not sure that's possible).

Here's what the pipeline's apply looks like:

    public PCollection<String> apply(PCollection<String> generator) {

        // Returns individuals lines of text as <String,String> KV pairs
        PCollection<KV<String,String>> generatedTextKV = generator.apply(
                ParDo.of(new GeneratorByLineFn()));

        // Groups the <String,String> KV pairs by value
        PCollection<KV<String, Iterable<String>>> groupedText = generatedTextKV.apply(
            GroupByKey.<String, String>create());

        // Hopefully returns output where all of each key's values are together
        PCollection<String> results = groupedText.apply(ParDo.of(new FormatOutputFn()));

        return results;
    }

Unfortunately, I cannot get the FormatOutputFn() to work as desired.

Iterating over the Iterable<String> and outputting each value doesn't guarantee the key,value grouping (please correct me if I'm wrong about this, then my problem is solved). I then tried using StringBuilder(), which works with small datasets but unsurprisingly generates java.lang.OutOfMemoryError: Java heap space errors in the log on larger data. I also tried the Flatten.FlattenIterables transform, but that doesn't work either since the value in the K,V pair is not a PCollection, but just a regular Iterable.

I've seen this question on analysis by common key, but from the answer it is not entirely clear to me exactly what I should do with my situation. I think I have to use Combine.PerKey, but I'm not exactly sure how to use it. I'm also assuming there has to be a pre-baked way to do this, but I can't find that pre-baked way in the docs. I'm sure I'm just not looking in the right place.

And, as mentioned above, if there is a way to get text file output where the name of the text file is the key and the values are all in the file, that would be amazing. But I don't think Dataflow can do this (yet?).

Thank you for reading.

回答1:

Dataflow doesn't currently support any notion of ordering on PCollections. You are correct that there is no guarantee that 'results' has an ordering, including key grouping. We would like to add ordering properties for PCollections at some point, but the timeline for that is not yet known.

Certain runners may appear to have ordering in certain situations, due to underlying implementation details. For example, if FormatOutputFn is fusing with a Write step, then it's likely you will see grouping because each KV<K, Iterable<V>> is processed into multiple <K,V>s which are written to the file before the next KV<K, Iterable<V>> is processed. But again this is just an artifact of how Dataflow chooses to optimize this particular case and should not be relied on generally.

As you already figured out, if a single element could fit in memory, you could have FormatOutputFn convert each KV<K, Iterable<V>> into a single String which contains multiple newlines.

Given that is not the case here, the best solution I can think of is to write the files by hand -- so FormatOutputFn takes each KV<K, Iterable<V>> and uses standard GCS libraries to open a file named K and write the Iterable<V> to it. The bad news is this gets a little tricky because you need to be aware of how our fault tolerance semantics might retry elements. But the good news is that we're currently working on libraries to help make these types of custom sinks easier.

As for the zero-length files, there's an awesome answer here: Why are zero byte files written to GCS when running a pipeline?