I am writing a program in Apache Beam using Python SDK to read from Pub/Sub the contents of a JSON file, and do some processing on the received string. This is the part in the program where I pull contents from Pub/Sub and do the processing:
with beam.Pipeline(options=PipelineOptions()) as pipeline:
lines = pipeline | beam.io.gcp.pubsub.ReadStringsFromPubSub(subscription=known_args.subscription)
lines_decoded = lines | beam.Map(lambda x: x.decode("base64"))
lines_split = lines_decoded | (beam.FlatMap(lambda x: x.split('\n')))
def json_to_tuple(jsonStr):
res = json.loads(jsonStr)
##printing retutn value
print (res['id'], res['messageSize'])
##
return (res['id'], res['messageSize'])
tupled = lines_split | beam.Map(json_to_tuple)
def printlines(line):
print line
result = tupled | beam.CombinePerKey(sum)
result | beam.Map(printlines)
While running the program, the code gets stuck after the creation of PCollection tupled
(no lines of code is executing after that). The strange thing is that when I change the source from Pub/Sub to a local file which contain the exact same content (using ReadFromText()
), the program is working perfectly.
What could be the reason for this behaviour?
According to the Pub/Sub I/O documentation (both the Apache Beam docs and Dataflow Pub/Sub I/O docs), by default, PubsubIO transforms work with unbounded PCollections.
PCollections can be either bounded or unbounded:
Before you operate over an unbounded PCollection, you must use one of the following strategies:
CombinePerKey
you are using), so you should first set a non-global windowing function.This may explain the behavior you are seeing, i.e. the same pipeline working when it reads from a local file (which is a bounded data source) but not working when it reads from a Pub/Sub subscription (which is an unbounded data source).
Therefore, in order to work with a Pub/Sub subscription, you should apply a windowing or triggering strategy so that the data in the PCollections can be properly processed in the following transforms.
EDIT: Also, as found out by @Arjun, it may be required to enable Streaming in the Pipeline with the option by setting the appropriate arg parameter using the following command: