This is my code that pulls the realtime database from firebase, formats it in a Json, uploads to the cloud and then to BQ.
#standardsql
import json
import boto
import gcs_oauth2_boto_plugin
import os
import shutil
import StringIO
import tempfile
import time
import argparse
import uuid
from firebase import firebase
from google.cloud import storage
from google.cloud.storage import blob
from google.cloud import bigquery
firebase = firebase.FirebaseApplication('https://dataworks-356fa.firebaseio.com/')
result = firebase.get('/connection_info', None)
id_keys = map(str, result.keys())
with open("firetobq.json", "w") as outfile:
for id in id_keys:
json.dump(result[id], outfile, indent=None)
outfile.write("\n")
client = storage.Client(project='dataworks-356fa')
bucket = client.get_bucket('dataworks-356fa-backups')
blob = bucket.blob('firetobq.json')
with open('firetobq.json', 'rb') as f:
blob.upload_from_file(f)
dataset = 'dataworks-356fa'
source = 'gs://dataworks-356fa-backups/firetobq.json'
def load_data_from_gcs(dataset, test12, source):
bigquery_client = bigquery.Client(dataset)
dataset = bigquery_client.dataset('FirebaseArchive')
table = dataset.table('test12')
job_name = str(uuid.uuid4())
job1.create_disposition = 'WRITE_TRUNCATE'
job1.begin()
job= bigquery_client.load_table_from_storage(
job_name, table, "gs://dataworks-356fa-backups/firetobq.json")
job.source_format = 'NEWLINE_DELIMITED_JSON'
job.begin()
wait_for_job(job)
def wait_for_job(job):
while True:
job.reload()
if job.state == 'DONE':
if job.error_result:
raise RuntimeError(job.errors)
return
time.sleep(1)
load_data_from_gcs(dataset, 'test12', source)
How can I change this to instead of importing the data in table test12 to creating a new table and also having that table expire after 1 week. (Im pretty sure the command for setting the expiration date has to be in seconds. 1 week = 604800 seconds) I know how to set the expiration date via the command line but would rather have it done here automatically.
And this is the error I am receiving after adding job1.
Traceback (most recent call last):
File "firebasetobq2.py", line 63, in <module>
load_data_from_gcs(dataset, 'test12', source)
File "firebasetobq2.py", line 44, in load_data_from_gcs
job1.create_disposition = 'WRITE_TRUNCATE'
NameError: global name 'job1' is not defined