How to specify insertId when spreaming insert to B

2020-03-24 05:49发布

问题:

BigQuery supports de-duplication for streaming insert. How can I use this feature using Apache Beam?

https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency

To help ensure data consistency, you can supply insertId for each inserted row. BigQuery remembers this ID for at least one minute. If you try to stream the same set of rows within that time period and the insertId property is set, BigQuery uses the insertId property to de-duplicate your data on a best effort basis. You might have to retry an insert because there's no way to determine the state of a streaming insert under certain error conditions, such as network errors between your system and BigQuery or internal errors within BigQuery. If you retry an insert, use the same insertId for the same set of rows so that BigQuery can attempt to de-duplicate your data. For more information, see troubleshooting streaming inserts.

I can not find such feature in Java doc. https://beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html

In this question, he suggest to set insertId in TableRow. Is this correct?

https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableRow.html?is-external=true

BigQuery client library has this feature.

https://googleapis.github.io/google-cloud-java/google-cloud-clients/apidocs/index.html?com/google/cloud/bigquery/package-summary.html https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-clients/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/InsertAllRequest.java#L134

回答1:

  • Pub/Sub + Beam/Dataflow + BigQuery: "Exactly once" should be guaranteed, and you don't need to worry much about this. That guarantee is stronger when you ask Dataflow to insert to BigQuery using FILE_LOADS instead of STREAMING_INSERTS, for now.

  • Kafka + Beam/Dataflow + BigQuery: If a message can be emitted more than once from Kafka (e.g. if the producer retried the insertion), then you need to take care of de-duplication. Either in BigQuery (as currently implemented, according to your comment), or in Dataflow with a .apply(Distinct.create()) transform.



回答2:

As Felipe mentioned in the comment, it seems that Dataflow is already using insertId for itself to implement "exactly once". so we can not manually specify insertId.