I changed source of WindowsWordCount example program from text file to cloud Pub/Sub as shown below. I published shakespeare file's data to Pub/Sub which did get fetched properly but none of the transformations after .groupByKey
seem to work.
sc.pubsubSubscription[String](psSubscription)
.withFixedWindows(windowSize) // apply windowing logic
.flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
.countByValue
.withWindow[IntervalWindow]
.swap
.groupByKey
.map {
s =>
println("\n\n\n\n\n\n\n This never prints \n\n\n\n\n")
println(s)
}
Changing the input from a text file to PubSub the PCollection "unbounded". Grouping that by key requires to define aggregation triggers, otherwise the grouper will wait forever. It's mentioned in the dataflow documentation here:
https://cloud.google.com/dataflow/model/group-by-key
Note: Either non-global Windowing or an aggregation trigger is required in order to perform a GroupByKey on an unbounded PCollection. This is because a bounded GroupByKey must wait for all the data with a certain key to be collected; but with an unbounded collection, the data is unlimited. Windowing and/or Triggers allow grouping to operate on logical, finite bundles of data within the unbounded data stream.
If you apply GroupByKey to an unbounded PCollection without setting either a non-global windowing strategy, a trigger strategy, or both, Dataflow will generate an IllegalStateException error when your pipeline is constructed.
Unfortunately, in the Python SDK of Apache Beam seems not to support triggers (yet), so I'm not sure what the solution would be in python.
(see https://beam.apache.org/documentation/programming-guide/#triggers)
With regards to Franz's comment above (I would reply to his comment specifically if StackOverflow would let me!,) I see that the docs say that triggering is not implemented... but they also say that Realtime Database functions are not available, while our current project is actively using them. They're just new.
See trigger functions here: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/trigger.py
Beware, the API is unfinished as this is not "release-ready" code. But it is available.