这里的情况 :我有一组被压缩在GCS文件,并使用.gz文件扩展名(即000000_ [0-5]。广州),我试图导入到一个单一的BQ表。 我一直在执行命令行最新命令,但想与数据流来实现这一目标,在未来的一些转换可能需要添加。
在压缩GCS文件中的数据是一个复杂的JSON结构频繁变化的模式,所以它是最容易把整个文件至BigQuery,其中只有一列TSV的叫record
,然后用内BQ JSON_EXTRACT功能解析出值需要在需要它们的时候。
问题 :我写了一个数据流管道,将做到在这种情况下的最低限度; 从GCS读取和写入的BigQuery资料表。 当我执行这条管道,但是,我得到一个JSON解析错误,如下所示:
Error while reading data, error message: JSON table encountered too
many errors, giving up. Rows: 1; errors: 1., error: Error while reading
data, error message: JSON table encountered too many errors, giving up.
Rows: 1; errors: 1., error: Error while reading data, error message:
JSON parsing error in row starting at position 2630029539: Value
encountered without start of object.
下面是我用匿名的一些变量数据流脚本。
from __future__ import absolute_import
import argparse
import logging
import re
import json
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.io import Read
from apache_beam.io import WriteToText
from apache_beam.io import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://BUCKET_NAME/input-data/000000_0.gz',
help='Input file to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=GCP_PROJECT_NAME',
'--staging_location=gs://BUCKET_NAME/dataflow-staging',
'--temp_location=gs://BUCKET_NAME/dataflow-temp',
'--job_name=gcs-gzcomp-to-bq1',
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
(p | "ReadFromGCS" >> ReadFromText(known_args.input)
| WriteToBigQuery('TABLE_NAME', dataset='DATASET_NAME',
project='GCP_PROJECT_NAME', schema='record:string'))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
正如你所看到的,我试图做同样的事情,因为我在传统的负载工作正在做,通过指定只包含一个带有字符串类型的列架构,但它仍然是失败的。
有没有办法明确地告诉数据流,我怎么想导入的GCS文件的详细信息? 即指定TSV,即使它是在每行一个有效的JSON对象?
另外,如果这个错误与别的我可能做错了,请电话说出来为好; 我超级新数据流,但与BQ和其他一些GCP工具非常有经验,所以希望能够加入到我的工具区。