有人能澄清什么为目的id_label
论点ReafFromPubSub变换 ?
我使用的BigQuery下沉,我的理解它就像一个insertId
为BQ流API, 资料表:insertAll
一个独特的ID为每个行。 BigQuery使用这个属性来检测一个尽力而为的基础上重复的加入请求。 欲了解更多信息,请参阅数据一致性。
但是我没有看到这个预期的行为。
我发布消息的Pub / Sub,具有相同属性的每个消息
message_id
值(这是故意的,以测试管道/ BQ重复数据删除的行为)我从管道酒吧如下
beam.io.gcp.pubsub.ReadFromPubSub(topic=TOPIC, subscription=None, id_label='message_id'
)
但还是查询BQ,所有的消息得到插入。 我所料,因为相同的MESSAGE_ID值公布的每封邮件,BQ应该推断这些...
可有人澄清PLS? 提前致谢!
另外,我注意到DirectRunner
继续使用该属性时抛出错误,
NotImplementedError:不支持id_label为PubSub的写着:DirectRunner
我已经使用DataflowRunner
......是预期的那样好?
干杯!
更新1:搬到DataflowRunner,与管道似乎尊重id_label
ReadFromPubSub期间()的参数。 然而,重复的消息并继续得到偶尔读入管道 。
我的发布应用程序,每15秒,消息发布在(以下格式发布的应用程序代码是在这里 ):
CID = 141&MESSAGE_ID = 2&evt_time = {{DATE_TIME_AT_RUNTIME}}
通知,即时传递相同message_id
在消息的属性值(=“2”)以及(这是打算试,测试演绎行为)。
- 我管线(数据流上的亚军,梁的Python SDK V2.11运行, 流水线代码是在这里 ),转储以下消息BQ。 正如你所看到的,具有相同的多个消息
message_id
得到读入管道,发出下沉。 这通常发生,当我停止/重新启动我的出版商的应用程序。
cid=141&message_id=2&evt_time=2019-03-17T09:31:15.792653Z
cid=141&message_id=2&evt_time=2019-03-17T09:30:00.767878Z
cid=141&message_id=2&evt_time=2019-03-17T09:28:30.747951Z
cid=141&message_id=2&evt_time=2019-03-17T09:22:30.668764Z
cid=141&message_id=2&evt_time=2019-03-17T09:21:00.646867Z
cid=141&message_id=2&evt_time=2019-03-17T09:19:45.630280Z
cid=141&message_id=2&evt_time=2019-03-17T09:12:05.466953Z
cid=141&message_id=2&evt_time=2019-03-17T09:10:42.956195Z
cid=141&message_id=2&evt_time=2019-03-17T09:01:42.816151Z