光束/数据流:: ReadFromPubSub(id_label)::意外行为(Beam / Dat

2019-10-29 06:50发布

有人能澄清什么为目的id_label论点ReafFromPubSub变换 ?

我使用的BigQuery下沉,我的理解它就像一个insertId为BQ流API, 资料表:insertAll

一个独特的ID为每个行。 BigQuery使用这个属性来检测一个尽力而为的基础上重复的加入请求。 欲了解更多信息,请参阅数据一致性。

但是我没有看到这个预期的行为。

  • 我发布消息的Pub / Sub,具有相同属性的每个消息message_id值(这是故意的,以测试管道/ BQ重复数据删除的行为)

  • 我从管道酒吧如下beam.io.gcp.pubsub.ReadFromPubSub(topic=TOPIC, subscription=None, id_label='message_id'

但还是查询BQ,所有的消息得到插入。 我所料,因为相同的MESSAGE_ID值公布的每封邮件,BQ应该推断这些...

可有人澄清PLS? 提前致谢!

另外,我注意到DirectRunner继续使用该属性时抛出错误,

NotImplementedError:不支持id_label为PubSub的写着:DirectRunner

我已经使用DataflowRunner ......是预期的那样好?

干杯!

更新1:搬到DataflowRunner,与管道似乎尊重id_label ReadFromPubSub期间()的参数。 然而,重复的消息并继续得到偶尔读入管道

  • 我的发布应用程序,每15秒,消息发布在(以下格式发布的应用程序代码是在这里 ):

    CID = 141&MESSAGE_ID = 2&evt_time = {{DATE_TIME_AT_RUNTIME}}

通知,即时传递相同message_id在消息的属性值(=“2”)以及(这是打算试,测试演绎行为)。

  • 我管线(数据流上的亚军,梁的Python SDK V2.11运行, 流水线代码是在这里 ),转储以下消息BQ。 正如你所看到的,具有相同的多个消息message_id得到读入管道,发出下沉。 这通常发生,当我停止/重新启动我的出版商的应用程序。
cid=141&message_id=2&evt_time=2019-03-17T09:31:15.792653Z
cid=141&message_id=2&evt_time=2019-03-17T09:30:00.767878Z
cid=141&message_id=2&evt_time=2019-03-17T09:28:30.747951Z
cid=141&message_id=2&evt_time=2019-03-17T09:22:30.668764Z
cid=141&message_id=2&evt_time=2019-03-17T09:21:00.646867Z
cid=141&message_id=2&evt_time=2019-03-17T09:19:45.630280Z
cid=141&message_id=2&evt_time=2019-03-17T09:12:05.466953Z
cid=141&message_id=2&evt_time=2019-03-17T09:10:42.956195Z
cid=141&message_id=2&evt_time=2019-03-17T09:01:42.816151Z

Answer 1:

这些都是不同的ID。 正如解释在这里发表一个主题每封邮件有一个字段中指定messageId这是保证是话题中是唯一的。 发布/订阅保证在-一次至少递送这样的订阅可以具有重复的(具有相同的,即消息messageId )。 数据流已经一次准确处理的语义,因为它使用的领域去重复的消息从订阅阅读时。 这是独立的接收器,这并不需要是的BigQuery的。

使用id_label (或.withIdAttribute()在Java SDK),我们可以强制消息根据不同的区域应该是唯一的考虑重复(如订单ID,客户ID等)。 输入信号源将读取重复的消息只有一次,你不会看到他们加大投入要素的数量在管道。 请记住,直接亚军是用于测试只用,并不提供相同的担保检查点,重复数据删除等方面作为例子参考此评论 。 这就是为什么你看到他们在管道,还考虑到的最可能的原因NotImplementedError消息,因此我建议移动到数据流亚军。

在另一边, insertId被使用时,在尽力而为的基础上,以避免重复行重试时流插入BigQuery中。 使用BigQueryIO它被创建在引擎盖下 ,不能被手动指定。 在你的情况,如果你的N个消息进入管道和N写入至BigQuery,它工作正常。 如果有任何必须重审,该行有同样的insertId和,因此被丢弃。



文章来源: Beam / DataFlow ::ReadFromPubSub(id_label) :: Unexpected behavior