I'm currently attempting to use withIdAttribute
with PubSubIO
to deduplicate messages that come from PubSub (since PubSub only guarantees at least once delivery).
My messages have four fields, label1
, label2
, timestamp
, and value
. A value is unique to the two labels at some timestamp. Therefore, I additionally set a uniqueID
attribute before writing to PubSub equal to these three values joined as a string.
For example, this is what I get from reading from a subscription using the gcp console tool.
┌───────────────────────────────────────────────────────────────────────────────────────────────────────────┬────────────────┬───────────────────────────────────────────────────────────────────────────────────────────────────┐
│ DATA │ MESSAGE_ID │ ATTRIBUTES │
├───────────────────────────────────────────────────────────────────────────────────────────────────────────┼────────────────┼───────────────────────────────────────────────────────────────────────────────────────────────────┤
│ {"label1":"5c381a51-2873-49b8-acf5-60a0fa59fc65","label2":"foobarbaz","timestamp":1513199383,"value":4.2} │ 11185357338249 │ eventTime=2017-12-13T21:09:43Z uniqueID=5c381a51-2873-49b8-acf5-60a0fa59fc65:foobarbaz:1513199383 │
└───────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────┴───────────────────────────────────────────────────────────────────────────────────────────────────┘
In my beam job, running on GCP Dataflow, I decode these messages as json, window them, group them by their two labels, and then attempt to aggregate them. However, in my aggregation class CreateMyAggregationsFn
I'm seeing duplicate messages that have the same label1
, label2
, and timestamp
.
public class MyBeam {
public interface MyBeanOptions extends PipelineOptions {
// ...
}
private static class MyMessage implements Serializable {
public long timestamp;
public double value;
public String label1;
public String label2;
}
private static class CreateMyAggregationsFn extends DoFn<KV<String, Iterable<MyMessage>>, MyAggregate> {
@ProcessElement
public void processElement(ProcessContext c) {
ArrayList<MyMessage> messages = new ArrayList<>();
c.element().getValue().forEach(messages::add);
Collections.sort(messages, (msg1, msg2) -> Long.compare(msg1.timestamp, msg2.timestamp));
MyMessage prev = null
for (MyMessage msg : messages) {
if (prev != null &&
msg.timestamp == prev.timestamp &&
msg.label1.equals(prev.label1) &&
msg.label2.equals(prev.label2)) {
// ... identifying duplicates here
}
prev = msg;
}
...
}
}
public static void main(String[] args) throws IOException {
MyBeamOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyBeamOptions.class);
Pipeline pipeline = Pipeline.create(options);
PubsubIO.Read<String> pubsubReadSubscription =
PubsubIO.readStrings()
.withTimestampAttribute("eventTime")
.withIdAttribute("uniqueID")
.fromSubscription(options.getPubsubSubscription());
pipeline
.apply("PubsubReadSubscription", pubsubReadSubscription)
.apply("ParseJsons", ParseJsons.of(MyMessage.class))
.setCoder(SerializableCoder.of(MyMessage.class))
.apply(
"Window",
Window.<MyMessage>into(FixedWindows.of(Duration.standardSeconds(60)))
.triggering(
AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardSeconds(3600)))
.apply(
"PairMessagesWithLabels",
MapElements.into(
TypeDescriptors.kvs(
TypeDescriptors.strings(), TypeDescriptor.of(MyMessage.class)))
.via(msg -> KV.of(msg.label1 + ":" + msg.label2, msg)))
.apply("GroupMessagesByLabels", GroupByKey.<String, MyMessage>create())
.apply("CreateAggregations", ParDo.of(new CreateMyAggregationsFn()))
// ...
PipelineResult result = pipeline.run();
}
}
Is there an additional step to deduping messages from PubSubIO with the withIdAttribute
method that I'm missing?
You are specifying
accumulatingFiredPanes()
, which means that in case of multiple firings for a window (e.g. if late data arrives) you are asking successive firings to include all the elements from previous firings, not just new elements. This by definition produces duplication. What are you trying to achieve by specifyingaccumulatingFiredPanes()
?