我正在开发一个Python程序使用像谷歌的数据流模板。
我在做什么是PubSub的写在BigQuery中的数据:
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
(p
# This is the source of the pipeline.
| 'Read from PubSub' >> beam.io.ReadFromPubSub('projects/.../topics/...')
#<Transformation code if needed>
# Destination
| 'String To BigQuery Row' >> beam.Map(lambda s: dict(Trama=s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema='Trama:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
))
)
p.run().wait_until_finish()
该代码是在本地运行,而不是在谷歌数据流还
这个“作品”,但不是我想要的,因为目前的数据都存储在缓冲区的BigQuery溪和我无法看到它(即使在等待一段时间后)的方式。
当要提供BigQuery中的? 为什么存储缓冲区流,而不是“正常”的表中?