I'm trying to convert the Cloud Dataflow "Wordcount" python example to a templated version by modifying the pipeline options to use runtime parameters as instructed in the docs:
def run(argv=None):
"""Main entry point; defines and runs the wordcount pipeline."""
class WordcountTemplatedOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
# Use add_value_provider_argument for arguments to be templatable
# Use add_argument as usual for non-templatable arguments
parser.add_value_provider_argument(
'--input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Path of the file to read from')
parser.add_argument(
'--output',
required=True,
help='Output file to write results to.')
pipeline_options = PipelineOptions(['--output', 'some/output_path'])
p = beam.Pipeline(options=pipeline_options)
wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions)
# Read the text file[pattern] into a PCollection.
etc. etc.
The problem is creating and staging the template ... when executing the command, the output is:
INFO:root:Starting the size estimation of the input
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Finished the size estimation of the input at 1 files. Estimation took 0.288088083267 seconds
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Starting finalize_write threads with num_shards: 1, batches: 1, num_threads: 1
INFO:root:Renamed 1 shards in 0.13 seconds.
INFO:root:number of empty lines: 1663
INFO:root:average word length: 4
and there is no produced file under template_location (gs://[YOUR_BUCKET_NAME]/templates/mytemplate) ...
I thought the command was trying to execute the dataflow from the desktop with the "default" input file, so I removed the "default" line in the --input argument, but I got this error:
raise BeamIOError('Unable to get the Filesystem', {path: e})
apache_beam.io.filesystem.BeamIOError: Unable to get the Filesystem with exceptions {None: AttributeError("'NoneType' object has no attribute 'strip'",)}
There is no official python dataflow templated sample (the only snippet I was able to find was this one, which looks pretty much like what's above).
Am I missing something?
Thanks!
Thanks to Google Cloud Support - I was able to fix the issue. In summary:
Clone the latest wordcount.py example (I have used an older version):
git clone https://github.com/apache/beam.git
The Google team updated the tutorial, so simply follow the code instructions. Make sure to include the @classmethod _add_argparse_args to be able to receive arguments during runtime, and use the new options when reading from the text file:
wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Generate the template as instructed
You should see the template under the template_location directory now
Thanks!