API config for BigQuery Federated Data Source

2019-08-26 22:35发布

问题:

I have the following config that works fine for loading a bunch of files into BigQuery:

config= {
  'configuration'=> {
    'load'=> {
      'sourceUris'=> 'gs://my_bucket/my_files_*',
      'schema'=> {
        'fields'=> fields_array
      },
      'schemaUpdateOptions' => [{ 'ALLOW_FIELD_ADDITION'=> true}],  
      'destinationTable'=> {
        'projectId'=> 'my_project',
        'datasetId'=> 'my_dataset',
        'tableId'=> 'my_table'
      },
      'sourceFormat' => 'NEWLINE_DELIMITED_JSON',
      'createDisposition' => 'CREATE_IF_NEEDED',
      'writeDisposition' => 'WRITE_TRUNCATE',
      'maxBadRecords'=> 0,
    }
  },
}

This is then executed with the following where client is pre-initialised:

result = client.execute(
  api_method: big_query.jobs.insert,
  parameters: { projectId: 'my_project', datasetId: 'my_dataset' },
  body_object: config
)  

I am now trying to write the equivalent to create an external / federated data source instead of loading the data. I need to do this to effectively create staging tables for ETL purposes. I have successfully done this using the BigQuery UI but need to run in code as it will eventually be a daily automated process. I've having a bit of trouble with the API docs and can't find any good examples to refer to. Can anyone help? Thanks in advance!

回答1:

By creating an external data source, do you mean create a table that refers to an external data source? In this case you can use bigquery.tables.insert and fill out the externalDataConfiguraiton. The table can then be used in queries to read from the external data source.

If you only want to use the external data source in one query, you can attach a temporary external table with the query, by putting the table definition to tableDefinitions. In command line it looks like this:

bq query --external_table_definition=avroTable::AVRO=gs://path-to-avro 'SELECT * FROM avroTable'



回答2:

Use idiomatic Cloud libraries when possible

Use the BigQuery module in the idiomatic Ruby client for GCP, which is Generally Available, instead of google-api-ruby-client, which is both in "maintenance mode only" and "alpha". You can find this recommendation here and here.

Authentication:

You can define project and access using environment variables.

How to create an External Data Source object

This is an example of creating an External Data Source with bigquery.external. I have slightly modified it to add relevant configurations from your solution.

bigquery = Google::Cloud::Bigquery.new

json_url = "gs://my_bucket/my_files_*"
json_table = bigquery.external csv_url do |json|
  json.autodetect = true
  json.format = "json"
  json.max_bad_records = 10
end

The object configuration methods are here. For example: autodetect, max_bad_records, urls, etc.

How to query it:

data = bigquery.query "SELECT * FROM my_ext_table",
                      external: { my_ext_table: json_table }

data.each do |row|
  puts row[:name]
end

Note: Also, both writeDisposition and createDisposition are only used for load/copy/query jobs which modify permanent BigQuery tables and wouldn't make much sense for an External Data Source. In fact they don't appear neither in the REST API reference nor in "Try this API" section for externalDataConfiguration.



回答3:

For anyone attempting the same, here's what I used to get it working. There are not many working examples online and the docs take some deciphering, so hope this helps someone else!

config= {
  "kind": "bigquery#table",
  "tableReference": {
    "projectId": 'my_project',
    "datasetId": 'my_dataset',
    "tableId": 'my_table'
  },
  "externalDataConfiguration": {
    "autodetect": true,
    "sourceUris": ['gs://my_bucket/my_files_*'],
    'sourceFormat' => 'NEWLINE_DELIMITED_JSON',
    'maxBadRecords'=> 10,
  }
}  

The documentation for externalDataConfiguration can be found in the BigQuery REST API reference and "Try this API" section for bigquery.tables.insert.

Then as pointed out in Hua Zhang's answer you run bigquery.tables.insert instead of bigquery.jobs.insert

result = client.execute(
    api_method: big_query.tables.insert,
    parameters: { projectId: my_project, datasetId: my_dataset },
    body_object: config
)