How to extract Google PubSub publish time in Apach

2019-07-24 03:31发布

My goal is to be able to access PubSub message Publish Time as recorded and set by Google PubSub in Apache Beam (Dataflow).

    PCollection<PubsubMessage> pubsubMsg
            = pipeline.apply("Read Messages From PubSub",
            PubsubIO.readMessagesWithAttributes()
                .fromSubscription(pocOptions.getInputSubscription()));

Does not seem to contain one as an attribute. I have tried

 .withTimestampAttribute("publish_time")

No luck either. What am I missing? Is it possible to extract Google PubSub publish time in dataflow?

1条回答
霸刀☆藐视天下
2楼-- · 2019-07-24 03:55

PubsubIO will read the message from Pub/Sub and assign the message publish time to the element as the record timestamp. Therefore, you can access it using ProcessContext.timestamp(). As an example:

p
    .apply("Read Messages", PubsubIO.readStrings().fromSubscription(subscription))
    .apply("Log Publish Time", ParDo.of(new DoFn<String, Void>() {
        @ProcessElement
        public void processElement(ProcessContext c) throws Exception {
            LOG.info("Message: " + c.element());
            LOG.info("Publish time: " + c.timestamp().toString());
            Date date= new Date();
            Long time = date.getTime();
            LOG.info("Processing time: " + new Instant(time).toString());
        }
    }));

I published a message a little bit ahead (to have a significant difference between event and processing time) and output with DirectRunner was:

Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Message: I published this message a little bit before
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Publish time: 2019-03-27T09:57:07.005Z
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Processing time: 2019-03-27T10:03:08.229Z

Minimal code here

查看更多
登录 后发表回答