Google-cloud-dataflow: Failed to insert json data

2019-07-16 11:28发布

问题:

Given the data set as below

{"slot":"reward","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42544}
{"slot":"reward_dlg","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42545}
...more type json data here

I try to filter those json data and insert them into bigquery with python sdk as following

ba_schema = 'slot:STRING,result:INTEGER,play_type:STRING,level:INTEGER'

class ParseJsonDoFn(beam.DoFn):
    B_TYPE = 'tag_B'
    def process(self, element):
        text_line = element.trip()
        data = json.loads(text_line)

        if data['type'] == 'ba':
            ba = {'slot': data['slot'], 'result': data['result'], 'p_type': data['p_type'], 'level': data['level']}
            yield pvalue.TaggedOutput(self.B_TYPE, ba)

def run():
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                      dest='input',
                      default='data/path/data',
                      help='Input file to process.')
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_args.extend([
      '--runner=DirectRunner',
      '--project=project-id',
      '--job_name=data-job',
    ])
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    with beam.Pipeline(options=pipeline_options) as p:
        lines = p | ReadFromText(known_args.input)

        multiple_lines = (
            lines
            | 'ParseJSON' >> (beam.ParDo(ParseJsonDoFn()).with_outputs(
                                      ParseJsonDoFn.B_TYPE)))

        b_line = multiple_lines.tag_B
        (b_line
            | "output_b" >> beam.io.WriteToBigQuery(
                                          'temp.ba',
                                          schema = B_schema,
                                          write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                                          create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                                        ))

And the debug logs show

INFO:root:finish <DoOperation output_b/WriteToBigQuery output_tags=['out'], receivers=[ConsumerSet[output_b/WriteToBigQuery.out0, coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]>
DEBUG:root:Successfully wrote 2 rows.

It seems those two data with type:ba was inserted into bigquery table temp.ba. However, I run

select * from `temp.ba`  limit 100;

There is no data in this table temp.ba.

Is there anything wrong with my codes or anything am I missing?


Update:

Thanks @Eric Schmidt answer, I know there could be some lag for initial data. However, after 5 minutes after running the above script, there is no data in the table yet.

When I try to remove write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE in the BigQuerySink

    | "output_b" >> beam.io.Write(
                      beam.io.BigQuerySink(
                          table = 'ba',
                          dataset = 'temp',
                          project = 'project-id',
                          schema = ba_schema,
                          #write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                          create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                        )
                    ))

Those two records could be found immediately.

And the table information is

Maybe I does not catch the meaning of initial data availability lag yet. Could someone give me more information?

回答1:

Two things to consider:

1) The Direct (local) runner uses streaming inserts. There is an initial data availability lag see this post.

2) Make sure you fully qualify the project you are streaming into. With BigQuerySink() project="foo", dataset="bar", table="biz".

I suspect your issue is #1.