Here is how my input file looks like:
{"Id": 1, "Address": {"Street":"MG Road","City":"Pune"}}
{"Id": 2, "Address": {"City":"Mumbai"}}
{"Id": 3, "Address": {"Street":"XYZ Road"}}
{"Id": 4}
{"Id": 5, "PhoneNumber": 12345678, "Address": {"Street":"ABCD Road", "City":"Bangalore"}}
In my dataflow pipeline, How I can I dynamically determine which fields are present in each row in order to adhere to the BigQuery table schema.
e.g., In row #2, Street
is missing. I want the entry for column Address.Street
in the BigQuery to be "N/A"
or null
and don't want pipeline to fail because of schema change or missing data.
How can I handle this logic in my dataflow job before writing to BigQuery in Python?
I recommend writing your data into temp table with just one field line
of type string
After you done with bringing your data to BigQuery temp table - now you can apply schema logic and query your data out of temp table to your final table
Below example is for BigQuery Standard SQL of how to apply schema logic against table with whole row in one field
#standardSQL
WITH t AS (
SELECT '{"Id": 1, "Address": {"Street":"MG Road","City":"Pune"}}' line UNION ALL
SELECT '{"Id": 2, "Address": {"City":"Mumbai"}}' UNION ALL
SELECT '{"Id": 3, "Address": {"Street":"XYZ Road"}}' UNION ALL
SELECT '{"Id": 4} ' UNION ALL
SELECT '{"Id": 5, "PhoneNumber": 12345678, "Address": {"Street":"ABCD Road", "City":"Bangalore"}}'
)
SELECT
JSON_EXTRACT_SCALAR(line, '$.Id') id,
JSON_EXTRACT_SCALAR(line, '$.PhoneNumber') PhoneNumber,
JSON_EXTRACT_SCALAR(line, '$[Address].Street') Street,
JSON_EXTRACT_SCALAR(line, '$[Address].City') City
FROM t
with result as below
Row id PhoneNumber Street City
1 1 null MG Road Pune
2 2 null null Mumbai
3 3 null XYZ Road null
4 4 null null null
5 5 12345678 ABCD Road Bangalore