Dataflow GCS to BQ Problems

2019-08-27 01:30发布

Here's the situation: I have a set of files in GCS that are compressed and have a .gz file extension (i.e. 000000_[0-5].gz) that I am trying to import into a single BQ table. I have been executing commands from the command line to date, but wanted to accomplish this with Dataflow, potentially adding in some transformations in the future.

The data in the compressed GCS files is a complex JSON structure that frequently changes schema, so it is easiest to bring the entire file into BigQuery as a TSV with only one column, called record, and then use JSON_EXTRACT functions within BQ to parse out the values needed at the time they are needed.

Issue: I have written a Dataflow pipeline that will do the bare minimum in this scenario; read from GCS and write to a BigQuery table. When I execute this pipeline, however, I am getting a JSON parse error, shown here:

Error while reading data, error message: JSON table encountered too 
many errors, giving up. Rows: 1; errors: 1., error: Error while reading 
data, error message: JSON table encountered too many errors, giving up. 
Rows: 1; errors: 1., error: Error while reading data, error message: 
JSON parsing error in row starting at position 2630029539: Value 
encountered without start of object.

Below is my Dataflow script with some variables anonymized.

from __future__ import absolute_import

import argparse
import logging
import re
import json

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.io import Read
from apache_beam.io import WriteToText
from apache_beam.io import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

def run(argv=None):

  parser = argparse.ArgumentParser()
  parser.add_argument('--input',
                      dest='input',
                      default='gs://BUCKET_NAME/input-data/000000_0.gz',
                      help='Input file to process.')
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_args.extend([
      '--runner=DataflowRunner',
      '--project=GCP_PROJECT_NAME',
      '--staging_location=gs://BUCKET_NAME/dataflow-staging',
      '--temp_location=gs://BUCKET_NAME/dataflow-temp',
      '--job_name=gcs-gzcomp-to-bq1',
  ])

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  with beam.Pipeline(options=pipeline_options) as p:

    (p | "ReadFromGCS" >> ReadFromText(known_args.input)
       | WriteToBigQuery('TABLE_NAME', dataset='DATASET_NAME',
           project='GCP_PROJECT_NAME', schema='record:string'))

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

As you can see, I attempted to do the same thing as I am doing in the traditional load job, by specifying a schema containing only one column with a string type, but it is still failing.

Is there a way to explicitly tell Dataflow more details about how I want to import the GCS files? i.e. specifying TSV even though it is a valid JSON object on each line?

Also, if this error is related to anything else I may have screwed up, please call that out as well; I'm super new to Dataflow, but pretty experienced with BQ & some other GCP tools, so hoping to add this to my toolbelt.

1条回答
何必那么认真
2楼-- · 2019-08-27 02:03

I believe the input collection to WriteToBigQuery should be a collection of dictionaries (each key maps to a BigQuery column), rather than a collection of strings. Try passing through something like | beam.Map(lambda line: dict(record=line)).

查看更多
登录 后发表回答