Beam / DataFlow ::ReadFromPubSub(id_label) :: Unex

2019-08-27 17:41发布

Can someone clarify what's the purpose for id_label argument in ReafFromPubSub transform?

I'm using BigQuery sink, my understanding it acts like an insertId for BQ Streaming API, Tabledata: insertAll

A unique ID for each row. BigQuery uses this property to detect duplicate insertion requests on a best-effort basis. For more information, see data consistency.

However I don't see this expected behaviour.

  • I'm publishing messages to Pub/Sub, each message with same attribute message_id value (this is intentional to test pipeline / BQ dedupe behaviour)

  • My pipeline reads from pubs as follows beam.io.gcp.pubsub.ReadFromPubSub(topic=TOPIC, subscription=None, id_label='message_id')

but still querying BQ, all messages get inserted. I expected, because each message published with same message_id value, BQ should have deduced those...

can someone clarify pls? Thanks in advance!

Also, I notice DirectRunner keep throwing error when using this attribute,

NotImplementedError: DirectRunner: id_label is not supported for PubSub reads

I've to use DataflowRunner... is that expected as well?

Cheers!

UPDATE 1 : moved to DataflowRunner, and the pipeline seems to respect id_label argument during ReadFromPubSub(). However, the duplicate messages DO continue to get read into the pipeline sporadically.

  • My publisher application, every 15 sec, publishes messages in following format (the publisher app code is here):

    cid=141&message_id=2&evt_time={{DATE_TIME_AT_RUNTIME}}

notice, i’m passing same message_id value (=‘2’) in message’s attribute as well (this is intention to try, test deduce behaviour).

  • my pipeline (running on Dataflow Runner, beam Python v2.11 SDK, pipeline code is here ), dumps following message to BQ. As you can see, multiple messages with same message_id get read into pipeline and emitted to sink. This usually happens, when I stop/restart my publisher application.
cid=141&message_id=2&evt_time=2019-03-17T09:31:15.792653Z
cid=141&message_id=2&evt_time=2019-03-17T09:30:00.767878Z
cid=141&message_id=2&evt_time=2019-03-17T09:28:30.747951Z
cid=141&message_id=2&evt_time=2019-03-17T09:22:30.668764Z
cid=141&message_id=2&evt_time=2019-03-17T09:21:00.646867Z
cid=141&message_id=2&evt_time=2019-03-17T09:19:45.630280Z
cid=141&message_id=2&evt_time=2019-03-17T09:12:05.466953Z
cid=141&message_id=2&evt_time=2019-03-17T09:10:42.956195Z
cid=141&message_id=2&evt_time=2019-03-17T09:01:42.816151Z

1条回答
干净又极端
2楼-- · 2019-08-27 17:47

Those are different IDs. As explained here, every message published to a topic has a field named messageId that is guaranteed to be unique within the topic. Pub/Sub guarantees at-least-once delivery so a subscription can have duplicates (i.e. messages with the same messageId). Dataflow has exactly-once processing semantics because it uses that field to de-duplicate messages when reading from a subscription. This is independent of the sink, which does not need to be BigQuery.

Using id_label (or .withIdAttribute() in the Java SDK) we can force that messages are considered duplicate according to a different field that should be unique (such as order ID, customer ID, etc.). The input source will read the repeated messages only once, you won't see them increase the count of input elements in the pipeline. Keep in mind that the Direct Runner is intended for testing purposes only and does not offer the same guarantees in terms of checkpointing, de-duplication, etc. As an example refer to this comment. That's the most likely cause of why you are seeing them in the pipeline, also taking into account the NotImplementedError messages, so I'd suggest moving to Dataflow Runner.

On the other side, insertId is used, on a best-effort basis, to avoid duplicate rows when retrying streaming inserts in BigQuery. Using BigQueryIO it is created under the hood and can't be specified manually. In your case, if your N messages enter the pipeline and N are written to BigQuery, it is working as expected. If any had to be retried, the row had the same insertId and was, therefore, discarded.

查看更多
登录 后发表回答