Google-cloud-dataflow: Failed to insert json data

2019-07-16 11:43发布

Given the data set as below

{"slot":"reward","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42544}
{"slot":"reward_dlg","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42545}
...more type json data here

I try to filter those json data and insert them into bigquery with python sdk as following

ba_schema = 'slot:STRING,result:INTEGER,play_type:STRING,level:INTEGER'

class ParseJsonDoFn(beam.DoFn):
    B_TYPE = 'tag_B'
    def process(self, element):
        text_line = element.trip()
        data = json.loads(text_line)

        if data['type'] == 'ba':
            ba = {'slot': data['slot'], 'result': data['result'], 'p_type': data['p_type'], 'level': data['level']}
            yield pvalue.TaggedOutput(self.B_TYPE, ba)

def run():
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                      dest='input',
                      default='data/path/data',
                      help='Input file to process.')
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_args.extend([
      '--runner=DirectRunner',
      '--project=project-id',
      '--job_name=data-job',
    ])
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    with beam.Pipeline(options=pipeline_options) as p:
        lines = p | ReadFromText(known_args.input)

        multiple_lines = (
            lines
            | 'ParseJSON' >> (beam.ParDo(ParseJsonDoFn()).with_outputs(
                                      ParseJsonDoFn.B_TYPE)))

        b_line = multiple_lines.tag_B
        (b_line
            | "output_b" >> beam.io.WriteToBigQuery(
                                          'temp.ba',
                                          schema = B_schema,
                                          write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                                          create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                                        ))

And the debug logs show

INFO:root:finish <DoOperation output_b/WriteToBigQuery output_tags=['out'], receivers=[ConsumerSet[output_b/WriteToBigQuery.out0, coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]>
DEBUG:root:Successfully wrote 2 rows.

It seems those two data with type:ba was inserted into bigquery table temp.ba. However, I run

select * from `temp.ba`  limit 100;

There is no data in this table temp.ba.

Is there anything wrong with my codes or anything am I missing?


Update:

Thanks @Eric Schmidt answer, I know there could be some lag for initial data. However, after 5 minutes after running the above script, there is no data in the table yet.

enter image description here

When I try to remove write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE in the BigQuerySink

    | "output_b" >> beam.io.Write(
                      beam.io.BigQuerySink(
                          table = 'ba',
                          dataset = 'temp',
                          project = 'project-id',
                          schema = ba_schema,
                          #write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                          create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                        )
                    ))

Those two records could be found immediately.

And the table information is

enter image description here

Maybe I does not catch the meaning of initial data availability lag yet. Could someone give me more information?

1条回答
欢心
2楼-- · 2019-07-16 12:06

Two things to consider:

1) The Direct (local) runner uses streaming inserts. There is an initial data availability lag see this post.

2) Make sure you fully qualify the project you are streaming into. With BigQuerySink() project="foo", dataset="bar", table="biz".

I suspect your issue is #1.

查看更多
登录 后发表回答