Row level atomic MERGE REPLACE in BigQuery

2019-05-23 08:19发布

问题:

For my use case I'm working with data identifiable by unique key at the source exploded into n (non deterministic) number of target entries loaded into BigQuery tables for analytic purposes.

Building this ETL to use Mongo recent Change Stream feature I would like to drop all entries in BigQuery and then load the new entries atomically.

Exploring BigQuery DML I see a MERGE operation is supported, but only WHEN MATCHED THEN DELETE or WHEN MATCHED THEN UPDATE is possible.

I'm interested in a WHEN MATCHED THEN DELETE, AND FOLLOW BY AN INSERT operation.

How would I implement such ETL in BigQuery while remaining atomic or eventually consistent as possible in terms of data availability and correctness.


EDIT 1: I would like to provide a concrete example to elaborate.

The lowest granularity of uniqueness I have on this dataset is user_id. Rows are not uniquely identifiable.

Example

1.

Updated user object received from mongo change stream:

user={_id: "3", name="max", registered="2018-07-05" q=["a", "b", "c"]}

2.

Current BigQuery.user_q holds

| user_id | q |
...
|    3    | a |
|    3    | b |
...

3.

Transform code loads modified user object into BigQuery.user_q_incoming

| user_id | q |
|    3    | a |
|    3    | b |
|    3    | c |

4.

MERGE between user_q and user_q_incoming:

  1. 2 rows in user_q that belong to user_id 3 are DELETED
  2. 3 rows in user_q_incoming that belong to user_id 3 are INSERTED.
  3. Rest of the data (...) in user_q is left in place, unmodified.

5.

BigQuery.user_q holds

| user_id | q |
...
|    3    | a |
|    3    | b |
|    3    | c |
...

For example user might delete a question from his profile. Leaving the remaining rows to be q=["a", "c"]. I need this to translate into the BigQuery world view as well.

回答1:

INSERT is supported by BigQuery DML

A MERGE statement is a DML statement that can combine INSERT, UPDATE, and DELETE operations into a single statement and perform the operations atomically.

for example

MERGE dataset.Inventory T
USING dataset.NewArrivals S
ON FALSE
WHEN NOT MATCHED AND product LIKE '%washer%' THEN
  INSERT (product, quantity) VALUES(product, quantity)
WHEN NOT MATCHED BY SOURCE AND product LIKE '%washer%' THEN
  DELETE   

so, you should be good to go with your ETL

EDIT based on more specific details added to question

Ok, I see - I think in this case MERGE will not apply as INSERT can be done ONLY for NOT MATCH clause. Someone might figure out how to trick MERGE to work in this case, but meantime below solution does what you want to achieve - i think so :o)

CREATE OR REPLACE TABLE `project.dataset.user_q` (user_id INT64, q STRING) AS
SELECT * FROM `project.dataset.user_q`
WHERE NOT user_id IN (SELECT DISTINCT user_id FROM `project.dataset.user_q_incoming`)
UNION ALL
SELECT * FROM `project.dataset.user_q_incoming`
WHERE user_id IN (SELECT DISTINCT user_id FROM `project.dataset.user_q`)


回答2:

There is a similar question and one walk-around to make MERGE work (https://issuetracker.google.com/issues/35905927#comment9).

Basically, something like following should work,

MERGE `project.dataset.user_q` T
USING (
  SELECT *, false AS is_insert FROM `project.dataset.user_q_incoming`
UNION ALL
  SELECT *, true AS is_insert FROM `project.dataset.user_q_incoming`
) S
ON T.user_id = S.user_id and NOT is_insert
WHEN MATCHED THEN
  DELETE
WHEN NOT MATCHED AND is_insert THEN
  INSERT(user_id, q) VALUES(user_id, q)

Ideally, following is what you need, but it's not supported yet.

MERGE `project.dataset.user_q`
USING `project.dataset.user_q_incoming`
ON FALSE
WHEN NOT MATCHED BY TARGET THEN
  INSERT(user_id, q) VALUES(user_id, q)
WHEN NOT MATCHED BY SOURCE AND user_id in (SELECT user_id FROM `project.dataset.user_q_incoming`) THEN
  DELETE