According to
How do we set maximum_bad_records when loading a Bigquery table from dataflow? there is currently no way to set the maxBadRecords
configuration when loading data into BigQuery from Dataflow. The suggestion is to validate the rows in the Dataflow job before inserting them into BigQuery.
If I have the TableSchema
and a TableRow
, how do I go about making sure that the row can safely be inserted into the table?
There must be an easier way of doing this than iterating over the fields in the schema, looking at their type and looking at the class of the value in the row, right? That seems error-prone, and the method must be fool-proof since the whole pipeline fails if a single row cannot be loaded.
Update:
My use case is an ETL job that at first will run on JSON (one object per line) logs on Cloud Storage and write to BigQuery in batch, but later will read objects from PubSub and write to BigQuery continuously. The objects contain a lot of information that isn't necessary to have in BigQuery and also contains parts that aren't even possible to describe in a schema (basically free form JSON payloads). Things like timestamps also need to be formatted to work with BigQuery. There will be a few variants of this job running on different inputs and writing to different tables.
In theory it's not a very difficult process, it takes an object, extracts a few properties (50-100), formats some of them and outputs the object to BigQuery. I more or less just loop over a list of property names, extract the value from the source object, look at a config to see if the property should be formatted somehow, apply the formatting if necessary (this could be downcasing, dividing a millisecond timestamp by 1000, extracting the hostname from a URL, etc.), and write the value to a TableRow
object.
My problem is that data is messy. With a couple of hundred million objects there are some that don't look as expected, it's rare, but with these volumes rare things still happen. Sometimes a property that should contain a string contains an integer, or vice-versa. Sometimes there's an array or an object where there should be a string.
Ideally I would like to take my TableRow
and pass it by TableSchema
and ask "does this work?".
Since this isn't possible what I do instead is I look at the TableSchema
object and try to validate/cast the values myself. If the TableSchema
says a property is of type STRING
I run value.toString()
before adding it to the TableRow
. If it's an INTEGER
I check that it's a Integer
, Long
or BigInteger
, and so on. The problem with this method is that I'm just guessing what will work in BigQuery. What Java data types will it accept for FLOAT
? For TIMESTAMP
? I think my validations/casts catch most problems, but there are always exceptions and edge cases.
In my experience, which is very limited, the whole work pipeline (job? workflow? not sure about the correct term) fails if a single row fails BigQuery's validations (just like a regular load does unless maxBadRecords
is set to a sufficiently large number). It also fails with superficially helpful messages like 'BigQuery import job "dataflow_job_xxx" failed. Causes: (5db0b2cdab1557e0): BigQuery job "dataflow_job_xxx" in project "xxx" finished with error(s): errorResult: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field'. Perhaps there is somewhere that can see a more detailed error message that could tell me which property it was and what the value was? Without that information it could just as well have said "bad data".
From what I can tell, at least when running in batch mode Dataflow will write the TableRow
objects to the staging area in Cloud Storage and then start a load once everything is there. This means that there is nowhere for me to catch any errors, my code is no longer running when BigQuery is loaded. I haven't run any job in streaming mode yet, but I'm not sure how it would be different there, from my (admittedly limited) understanding the basic principle is the same, it's just the batch size that's smaller.
People use Dataflow and BigQuery, so it can't be impossible to make this work without always having to worry about the whole pipeline stopping because of a single bad input. How do people do it?