This is in reference to Apache Beam SDK Version 2.2.0.
I'm attempting to use AfterPane.elementCountAtLeast(...)
but not having any success so far. What I want looks a lot like Writing to Google Cloud Storage from PubSub using Cloud Dataflow using DoFn, but needs to be adapted to 2.2.0. Ultimately I just need a simple OR where a file is written after X elements OR Y time has passed. I intend to set the time very high so that the write happens on the number of elements in the majority of cases, and only writes based on duration during times of very low message volume.
Using GCP Dataflow 2.0 PubSub to GCS as a reference here's what I've tried:
String bucketPath =
String.format("gs://%s/%s",
options.getBucketName(),
options.getDestinationDirName());
PCollection<String> windowedValues = stringMessages
.apply("Create windows",
Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(250)))
.discardingFiredPanes());
windowedValues
.apply("Write to GCS",
TextIO
.write()
.to(bucketPath)
.withNumShards(options.getNumShards())
.withWindowedWrites());
Where stringMessages
is a PCollection that is reading from an Avro-encoded pubsub subscription. There is some unpacking happening upstream to get the events converted to strings, but no merging/partitioning/grouping, just transforms.
Element count is hard coded at 250 just for PoC. Once it is proven, it will likely be cranked up to the 10s or 100s of thousands range.
The Problem
This implementation has resulted in text files of various lengths. The files lengths start very high (1000s of elements) when the job first starts up (presumably processing backlogged data, and then stabilize at some point. I've tried altering the 'numShards' to 1 and 10. At 1, the element count of the written files stabilizes at 600, and with 10, it stabilizes at 300.
What am I missing here?
As a side note, this is only step 1. Once I figure out writing using element count, I still need to figure out writing these files as compressed json (.json.gz) as opposed to plain-text files.
Posting what I learned for reference by others.
What was not clear to me when I wrote this is the following from the Apache Beam Documentation:
With this knowledge, I rethought my pipeline a bit. From the FileIO documentation under Writing files -> How many shards are generated per pane:
So I decided to use
FileIO
'swriteDynamic
to perform the writes and specifywithNumShards
in order to get the implicitGroupByKey
. The final result looks like this: