I'm trying to achieve exactly-once delivery using Google Dataflow and PubSub using Apache Beam SDK 2.6.0.
Use case is quite simple:
'Generator' dataflow job sends 1M messages to PubSub topic.
GenerateSequence
.from(0)
.to(1000000)
.withRate(100000, Duration.standardSeconds(1L));
'Archive' dataflow job reads messages from PubSub subscription and saves to Google Cloud Storage.
pipeline
.apply("Read events",
PubsubIO.readMessagesWithAttributes()
// this is to achieve exactly-once delivery
.withIdAttribute(ATTRIBUTE_ID)
.fromSubscription('subscription')
.withTimestampAttribute(TIMESTAMP_ATTRIBUTE))
.apply("Window events",
Window.<Dto>into(FixedWindows.of(Duration.millis(options.getWindowDuration())))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.withAllowedLateness(Duration.standardMinutes(15))
.discardingFiredPanes())
.apply("Events count metric", ParDo.of(new CountMessagesMetric()))
.apply("Write files to archive",
FileIO.<String, Dto>writeDynamic()
.by(Dto::getDataSource).withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.of((msg, ctx) -> msg.getData(), Requirements.empty()), TextIO.sink())
.to(archiveDir)
.withTempDirectory(archiveDir)
.withNumShards(options.getNumShards())
.withNaming(dataSource ->
new SyslogWindowedDataSourceFilenaming(dataSource, archiveDir, filenamePrefix, filenameSuffix)
));
I added 'withIdAttribute' to both Pubsub.IO.Write ('Generator' job) and PubsubIO.Read ('Archive' job) and expect that it will guarantee exactly-once semantics.
I would like to test the 'negative' scenario:
- 'Generator' dataflow job sends 1M messages to PubSub topic.
- 'Archive' dataflow job starts to work, but I stop it in the middle of processing clicking 'Stop job' -> 'Drain'. Some portion of messages has been processed and saved to Cloud Storage, let's say 400K messages.
- I start 'Archive' job again and do expect that it will take unprocessed messages (600K) and eventually I will see exactly 1M messages saved to Storage.
What I got in fact - all messages are delivered (at-least-once is achieved), but on top of that there are a lot of duplicates - something in the neighborhood of 30-50K per 1M messages.
Is there any solution to achieve exactly-once delivery?