BigQuery writeTableRows Always writing to buffer

2019-02-20 20:54发布

问题:

We are trying to write to Big Query using Apache Beam and avro.

The following seems to work ok:-

p.apply("Input", AvroIO.read(DataStructure.class).from("AvroSampleFile.avro"))
            .apply("Transform", ParDo.of(new CustomTransformFunction()))
            .apply("Load", BigQueryIO.writeTableRows().to(table).withSchema(schema));

We then tried to use it in the following manner to get data from the Google Pub/Sub

p.begin()
            .apply("Input", PubsubIO.readAvros(DataStructure.class).fromTopic("topicName"))
            .apply("Transform", ParDo.of(new CustomTransformFunction()))
            .apply("Write", BigQueryIO.writeTableRows()
                    .to(table)
                    .withSchema(schema)
                    .withTimePartitioning(timePartitioning)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        p.run().waitUntilFinish();

When we do this it always pushes it to the buffer and Big Query seems to take a long time to read from the buffer. Can anyone tell me why the above won't write the records directly to the Big Query tables?

UPDATE:- It looks like I need add the following settings but this throws an java.lang.IllegalArgumentException.

.withMethod(Method.FILE_LOADS)
.withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))

回答1:

The answer is you need to include "withNumFileShards" like so (Can be 1 to 1000).

        p.begin()
            .apply("Input", PubsubIO.readAvros(DataStructure.class).fromTopic("topicName"))
            .apply("Transform", ParDo.of(new CustomTransformFunction()))
            .apply("Write", BigQueryIO.writeTableRows()
                    .to(table)
                    .withSchema(schema)
                    .withTimePartitioning(timePartitioning)
            .withMethod(Method.FILE_LOADS)
            .withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))
            .withNumFileShards(1000)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        p.run().waitUntilFinish();

I can't find this documented anywhere to say that withNumFileShards is mandatory however there is a Jira ticket for this which I found after the fix.

https://issues.apache.org/jira/browse/BEAM-3198