Dynamic table name when writing to BQ from dataflo

2019-01-19 09:34发布

问题:

As a followup question to the following question and answer:

https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey

I'd like to confirm with google dataflow engineering team (@jkff) if the 3rd option proposed by Eugene is at all possible with google dataflow:

"have a ParDo that takes these keys and creates the BigQuery tables, and another ParDo that takes the data and streams writes to the tables"

My understanding is that ParDo/DoFn will process each element, how could we specify a table name (function of the keys passed in from side inputs) when writing out from processElement of a ParDo/DoFn?

Thanks.

Updated with a DoFn, which is not working obviously since c.element().value is not a pcollection.

PCollection<KV<String, Iterable<String>>> output = ...;

public class DynamicOutput2Fn extends DoFn<KV<String, Iterable<String>>, Integer> {

private final PCollectionView<List<String>> keysAsSideinputs;
public DynamicOutput2Fn(PCollectionView<List<String>> keysAsSideinputs) {
        this.keysAsSideinputs = keysAsSideinputs;
    }

@Override
    public void processElement(ProcessContext c) {
        List<String> keys = c.sideInput(keysAsSideinputs);
        String key = c.element().getKey();

        //the below is not working!!! How could we write the value out to a sink, be it gcs file or bq table???
        c.element().getValue().apply(Pardo.of(new FormatLineFn()))
                .apply(TextIO.Write.to(key));

        c.output(1);
    }    
}    

回答1:

The BigQueryIO.Write transform does not support this. The closest thing you can do is to use per-window tables, and encode whatever information you need to select the table in the window objects by using a custom WindowFn.

If you don't want to do that, you can make BigQuery API calls directly from your DoFn. With this, you can set the table name to anything you want, as computed by your code. This could be looked up from a side input, or computed directly from the element the DoFn is currently processing. To avoid making too many small calls to BigQuery, you can batch up the requests using finishBundle();

You can see how the Dataflow runner does the streaming import here: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java