Given the data set as below
{"slot":"reward","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42544}
{"slot":"reward_dlg","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42545}
...more type json data here
I try to filter those json data and insert them into bigquery with python sdk as following
ba_schema = 'slot:STRING,result:INTEGER,play_type:STRING,level:INTEGER'
class ParseJsonDoFn(beam.DoFn):
B_TYPE = 'tag_B'
def process(self, element):
text_line = element.trip()
data = json.loads(text_line)
if data['type'] == 'ba':
ba = {'slot': data['slot'], 'result': data['result'], 'p_type': data['p_type'], 'level': data['level']}
yield pvalue.TaggedOutput(self.B_TYPE, ba)
def run():
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='data/path/data',
help='Input file to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DirectRunner',
'--project=project-id',
'--job_name=data-job',
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText(known_args.input)
multiple_lines = (
lines
| 'ParseJSON' >> (beam.ParDo(ParseJsonDoFn()).with_outputs(
ParseJsonDoFn.B_TYPE)))
b_line = multiple_lines.tag_B
(b_line
| "output_b" >> beam.io.WriteToBigQuery(
'temp.ba',
schema = B_schema,
write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
))
And the debug logs show
INFO:root:finish <DoOperation output_b/WriteToBigQuery output_tags=['out'], receivers=[ConsumerSet[output_b/WriteToBigQuery.out0, coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]>
DEBUG:root:Successfully wrote 2 rows.
It seems those two data with type:ba
was inserted into bigquery table temp.ba
. However, I run
select * from `temp.ba` limit 100;
There is no data in this table temp.ba
.
Is there anything wrong with my codes or anything am I missing?
Update:
Thanks @Eric Schmidt answer, I know there could be some lag for initial data. However, after 5 minutes after running the above script, there is no data in the table yet.
When I try to remove write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE
in the BigQuerySink
| "output_b" >> beam.io.Write(
beam.io.BigQuerySink(
table = 'ba',
dataset = 'temp',
project = 'project-id',
schema = ba_schema,
#write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
))
Those two records could be found immediately.
And the table information is
Maybe I does not catch the meaning of initial data availability lag yet. Could someone give me more information?
Two things to consider:
1) The Direct (local) runner uses streaming inserts. There is an initial data availability lag see this post.
2) Make sure you fully qualify the project you are streaming into. With BigQuerySink() project="foo", dataset="bar", table="biz".
I suspect your issue is #1.