数据流GCS到BQ问题(Dataflow GCS to BQ Problems)

2019-10-29 10:41发布

这里的情况 :我有一组被压缩在GCS文件,并使用.gz文件扩展名(即000000_ [0-5]。广州),我试图导入到一个单一的BQ表。 我一直在执行命令行最新命令,但想与数据流来实现这一目标,在未来的一些转换可能需要添加。

在压缩GCS文件中的数据是一个复杂的JSON结构频繁变化的模式,所以它是最容易把整个文件至BigQuery,其中只有一列TSV的叫record ,然后用内BQ JSON_EXTRACT功能解析出值需要在需要它们的时候。

问题 :我写了一个数据流管道,将做到在这种情况下的最低限度; 从GCS读取和写入的BigQuery资料表。 当我执行这条管道,但是,我得到一个JSON解析错误,如下所示:

Error while reading data, error message: JSON table encountered too 
many errors, giving up. Rows: 1; errors: 1., error: Error while reading 
data, error message: JSON table encountered too many errors, giving up. 
Rows: 1; errors: 1., error: Error while reading data, error message: 
JSON parsing error in row starting at position 2630029539: Value 
encountered without start of object.

下面是我用匿名的一些变量数据流脚本。

from __future__ import absolute_import

import argparse
import logging
import re
import json

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.io import Read
from apache_beam.io import WriteToText
from apache_beam.io import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

def run(argv=None):

  parser = argparse.ArgumentParser()
  parser.add_argument('--input',
                      dest='input',
                      default='gs://BUCKET_NAME/input-data/000000_0.gz',
                      help='Input file to process.')
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_args.extend([
      '--runner=DataflowRunner',
      '--project=GCP_PROJECT_NAME',
      '--staging_location=gs://BUCKET_NAME/dataflow-staging',
      '--temp_location=gs://BUCKET_NAME/dataflow-temp',
      '--job_name=gcs-gzcomp-to-bq1',
  ])

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  with beam.Pipeline(options=pipeline_options) as p:

    (p | "ReadFromGCS" >> ReadFromText(known_args.input)
       | WriteToBigQuery('TABLE_NAME', dataset='DATASET_NAME',
           project='GCP_PROJECT_NAME', schema='record:string'))

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

正如你所看到的,我试图做同样的事情,因为我在传统的负载工作正在做,通过指定只包含一个带有字符串类型的列架构,但它仍然是失败的。

有没有办法明确地告诉数据流,我怎么想导入的GCS文件的详细信息? 即指定TSV,即使它是在每行一个有效的JSON对象?

另外,如果这个错误与别的我可能做错了,请电话说出来为好; 我超级新数据流,但与BQ和其他一些GCP工具非常有经验,所以希望能够加入到我的工具区。

Answer 1:

我相信输入集合WriteToBigQuery应词典的集合(每个键映射到的BigQuery列),而不是字符串的集合。 尝试通过类似| beam.Map(lambda line: dict(record=line)) | beam.Map(lambda line: dict(record=line))



文章来源: Dataflow GCS to BQ Problems