Cloud Dataflow to BigQuery - too many sources

2019-08-09 03:39发布

问题:

I have a job that among other things also inserts some of the data it reads from files into BigQuery table for later manual analysis.

It fails with the following error:

job error: Too many sources provided: 10001. Limit is 10000., error: Too many sources provided: 10001. Limit is 10000.

What does it refer to as "source"? Is it a file or a pipeline step?

Thanks, G

回答1:

I'm guessing the error is coming from BigQuery and means that we are trying to upload too many files when we create your output table.

Could you provide some more details on the error / context (like a snippet of the commandline output (if using the BlockingDataflowPipelineRunner) so I can confirm? A jobId would also be helpful.

Is there something about your pipeline structure that is going to result in a large number of output files? That could either be a large amount of data or perhaps finely sharded input files without a subsequent GroupByKey operation (which would let us reshard the data into larger pieces).



回答2:

The note in In Google Cloud Dataflow BigQueryIO.Write occur Unknown Error (http code 500) mitigates this issue:

Dataflow SDK for Java 1.x: as a workaround, you can enable this experiment in : --experiments=enable_custom_bigquery_sink

In Dataflow SDK for Java 2.x, this behavior is default and no experiments are necessary.

Note that in both versions, temporary files in GCS may be left over if your job fails.



回答3:

public static class ForceGroupBy <T> extends PTransform<PCollection<T>, PCollection<KV<T, Iterable<Void>>>> {
    private static final long serialVersionUID = 1L;
    @Override
    public PCollection<KV<T, Iterable<Void>>> apply(PCollection<T> input) {
        PCollection<KV<T,Void>> syntheticGroup = input.apply(
                ParDo.of(new  DoFn<T,KV<T,Void>>(){
                    private static final long serialVersionUID = 1L;
                    @Override
                    public void processElement(
                            DoFn<T, KV<T, Void>>.ProcessContext c)
                                    throws Exception {
                        c.output(KV.of(c.element(),(Void)null));

                    } }));
        return syntheticGroup.apply(GroupByKey.<T,Void>create());
    }
}