How to manage/handle schema changes while loading

2020-03-31 03:56发布

问题:

Here is how my input file looks like:

{"Id": 1, "Address": {"Street":"MG Road","City":"Pune"}}
{"Id": 2, "Address": {"City":"Mumbai"}}
{"Id": 3, "Address": {"Street":"XYZ Road"}}
{"Id": 4}
{"Id": 5, "PhoneNumber": 12345678, "Address": {"Street":"ABCD Road", "City":"Bangalore"}}

In my dataflow pipeline, How I can I dynamically determine which fields are present in each row in order to adhere to the BigQuery table schema. e.g., In row #2, Street is missing. I want the entry for column Address.Street in the BigQuery to be "N/A" or null and don't want pipeline to fail because of schema change or missing data.

How can I handle this logic in my dataflow job before writing to BigQuery in Python?

回答1:

I recommend writing your data into temp table with just one field line of type string

After you done with bringing your data to BigQuery temp table - now you can apply schema logic and query your data out of temp table to your final table

Below example is for BigQuery Standard SQL of how to apply schema logic against table with whole row in one field

#standardSQL
WITH t AS (
  SELECT '{"Id": 1, "Address": {"Street":"MG Road","City":"Pune"}}' line UNION ALL
  SELECT '{"Id": 2, "Address": {"City":"Mumbai"}}' UNION ALL
  SELECT '{"Id": 3, "Address": {"Street":"XYZ Road"}}' UNION ALL
  SELECT '{"Id": 4}  ' UNION ALL
  SELECT '{"Id": 5, "PhoneNumber": 12345678, "Address": {"Street":"ABCD Road", "City":"Bangalore"}}' 
)
SELECT
  JSON_EXTRACT_SCALAR(line, '$.Id') id,
  JSON_EXTRACT_SCALAR(line, '$.PhoneNumber') PhoneNumber,
  JSON_EXTRACT_SCALAR(line, '$[Address].Street') Street,
  JSON_EXTRACT_SCALAR(line, '$[Address].City') City 
FROM t  

with result as below

Row id  PhoneNumber Street      City     
1   1   null        MG Road     Pune     
2   2   null        null        Mumbai   
3   3   null        XYZ Road    null     
4   4   null        null        null     
5   5   12345678    ABCD Road   Bangalore