I am writing a program in Apache Beam using Python SDK to read from Pub/Sub the contents of a JSON file, and do some processing on the received string. This is the part in the program where I pull contents from Pub/Sub and do the processing:
with beam.Pipeline(options=PipelineOptions()) as pipeline:
lines = pipeline | beam.io.gcp.pubsub.ReadStringsFromPubSub(subscription=known_args.subscription)
lines_decoded = lines | beam.Map(lambda x: x.decode("base64"))
lines_split = lines_decoded | (beam.FlatMap(lambda x: x.split('\n')))
def json_to_tuple(jsonStr):
res = json.loads(jsonStr)
##printing retutn value
print (res['id'], res['messageSize'])
##
return (res['id'], res['messageSize'])
tupled = lines_split | beam.Map(json_to_tuple)
def printlines(line):
print line
result = tupled | beam.CombinePerKey(sum)
result | beam.Map(printlines)
While running the program, the code gets stuck after the creation of PCollection tupled
(no lines of code is executing after that). The strange thing is that when I change the source from Pub/Sub to a local file which contain the exact same content (using ReadFromText()
), the program is working perfectly.
What could be the reason for this behaviour?
According to the Pub/Sub I/O documentation (both the Apache Beam docs and Dataflow Pub/Sub I/O docs), by default, PubsubIO transforms work with unbounded PCollections.
PCollections can be either bounded or unbounded:
- Bounded: the data comes from a fixed source, like a file.
- Unbounded: the data comes from a source that is continuously updating, such as a Pub/Sub subscription.
Before you operate over an unbounded PCollection, you must use one of the following strategies:
- Windowing: unbounded PCollections cannot be directly used on a grouping transform (such as the
CombinePerKey
you are using), so you should first set a non-global windowing function.
- Triggers: you can set up a trigger for an unbounded PCollection in such a way that it provides periodic updates on an unbounded dataset, even if the data in the subscription is still flowing.
This may explain the behavior you are seeing, i.e. the same pipeline working when it reads from a local file (which is a bounded data source) but not working when it reads from a Pub/Sub subscription (which is an unbounded data source).
Therefore, in order to work with a Pub/Sub subscription, you should apply a windowing or triggering strategy so that the data in the PCollections can be properly processed in the following transforms.
EDIT: Also, as found out by @Arjun, it may be required to enable Streaming in the Pipeline with the option by setting the appropriate arg parameter using the following command:
pipeline_options.view_as(StandardOptions).streaming = True