Creating a new table and setting the expiration da

2019-09-19 10:31发布

问题:

This is my code that pulls the realtime database from firebase, formats it in a Json, uploads to the cloud and then to BQ.

#standardsql
import json
import boto
import gcs_oauth2_boto_plugin
import os
import shutil
import StringIO
import tempfile
import time
import argparse
import uuid

from firebase import firebase
from google.cloud import storage
from google.cloud.storage import blob
from google.cloud import bigquery

firebase = firebase.FirebaseApplication('https://dataworks-356fa.firebaseio.com/')
result = firebase.get('/connection_info', None)
id_keys = map(str, result.keys())

with open("firetobq.json", "w") as outfile:
  for id in id_keys:
    json.dump(result[id], outfile, indent=None)
    outfile.write("\n")

client = storage.Client(project='dataworks-356fa')
bucket = client.get_bucket('dataworks-356fa-backups')
blob = bucket.blob('firetobq.json')
with open('firetobq.json', 'rb') as f:
  blob.upload_from_file(f)

dataset = 'dataworks-356fa'
source = 'gs://dataworks-356fa-backups/firetobq.json'


def load_data_from_gcs(dataset, test12, source):
    bigquery_client = bigquery.Client(dataset)
    dataset = bigquery_client.dataset('FirebaseArchive')
    table = dataset.table('test12')
    job_name = str(uuid.uuid4())
    job1.create_disposition = 'WRITE_TRUNCATE'
    job1.begin()

    job= bigquery_client.load_table_from_storage(
        job_name, table, "gs://dataworks-356fa-backups/firetobq.json")
    job.source_format = 'NEWLINE_DELIMITED_JSON'

    job.begin()
    wait_for_job(job)

def wait_for_job(job):
    while True:
        job.reload()
        if job.state == 'DONE':
            if job.error_result:
                raise RuntimeError(job.errors)
            return
        time.sleep(1)

load_data_from_gcs(dataset, 'test12', source)

How can I change this to instead of importing the data in table test12 to creating a new table and also having that table expire after 1 week. (Im pretty sure the command for setting the expiration date has to be in seconds. 1 week = 604800 seconds) I know how to set the expiration date via the command line but would rather have it done here automatically.

And this is the error I am receiving after adding job1.

Traceback (most recent call last):
  File "firebasetobq2.py", line 63, in <module>
    load_data_from_gcs(dataset, 'test12', source)
  File "firebasetobq2.py", line 44, in load_data_from_gcs
    job1.create_disposition = 'WRITE_TRUNCATE'
NameError: global name 'job1' is not defined

回答1:

If you want to set an expiration time for your table, this might do the trick:

from datetime import datetime, timedelta
from google.cloud.bigquery.schema import SchemaField

def load_data_from_gcs(dataset,
                   table_name,
                   table_schema,
                   source,
                   source_format,
                   expiration_time):
    bigquery_client = bigquery.Client()
    dataset = bigquery_client.dataset(dataset)
    table = dataset.table(table_name)
    table.schema = table_schema
    table.expires = expiration_time
    if not table.created:
        table.create()

    job_name = str(uuid.uuid4())
    job= bigquery_client.load_table_from_storage(
        job_name, table, source)
    job.source_format = source_format

    job.begin()
    wait_for_job(job)

dataset = 'FirebaseArchive'
table_name = 'test12'
gcs_source = 'gs://dataworks-356fa-backups/firetobq.json'
source_format = 'NEWLINE_DELIMITED_JSON'
table.schema = [SchemaField(field1), SchemaField(field2), (...)]
expiration_time = datetime.now() + timedelta(seconds=604800)

load_data_from_gcs(dataset,
                   table_name,
                   table_schema,
                   gcs_source,
                   source_format,
                   expiration_time)

Notice the only difference is the lines of code where it sets:

table.expires = expiration_time

Whose value must be of type datetime (here defined as expiration_time = datetime.now() + timedelta(seconds=604800))

Not sure if it's possible to use schema auto-detection using the Python API but you still can send this information using the SchemaFields. For instance, if your table have two fields, user_id and job_id, both being INTEGERS, then the schema would be:

table_schema = [SchemaField('user_id', field_type='INT64'),
                SchemaField('job_id', field_type='INT64')]

More information on how schema works in BigQuery you can find here.

[EDIT]:

Just saw your other question, if you want to truncate the table and then write data to it, you can just do:

job.create_disposition = 'WRITE_TRUNCATE'
job.begin()

In your load_data_from_gcs function. This will automatically delete the table and create a new one with the data from your storage file. You won't have to define a schema for that as it's already previously defined (therefore might be a much easier solution for you).