Cannot generate templated dataflow in Python

2019-07-29 01:05发布

I'm trying to convert the Cloud Dataflow "Wordcount" python example to a templated version by modifying the pipeline options to use runtime parameters as instructed in the docs:

def run(argv=None):
  """Main entry point; defines and runs the wordcount pipeline."""

  class WordcountTemplatedOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)
  wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions)

  # Read the text file[pattern] into a PCollection.
  etc. etc.

The problem is creating and staging the template ... when executing the command, the output is:

INFO:root:Starting the size estimation of the input
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Finished the size estimation of the input at 1 files. Estimation took 0.288088083267 seconds
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Starting finalize_write threads with num_shards: 1, batches: 1, num_threads: 1
INFO:root:Renamed 1 shards in 0.13 seconds.
INFO:root:number of empty lines: 1663
INFO:root:average word length: 4

and there is no produced file under template_location (gs://[YOUR_BUCKET_NAME]/templates/mytemplate) ...

I thought the command was trying to execute the dataflow from the desktop with the "default" input file, so I removed the "default" line in the --input argument, but I got this error:

raise BeamIOError('Unable to get the Filesystem', {path: e})
apache_beam.io.filesystem.BeamIOError: Unable to get the Filesystem with exceptions {None: AttributeError("'NoneType' object has no attribute 'strip'",)}

There is no official python dataflow templated sample (the only snippet I was able to find was this one, which looks pretty much like what's above).

Am I missing something?

Thanks!

1条回答
\"骚年 ilove
2楼-- · 2019-07-29 01:46

Thanks to Google Cloud Support - I was able to fix the issue. In summary:

  1. Clone the latest wordcount.py example (I have used an older version):

    git clone https://github.com/apache/beam.git

  2. The Google team updated the tutorial, so simply follow the code instructions. Make sure to include the @classmethod _add_argparse_args to be able to receive arguments during runtime, and use the new options when reading from the text file:

    wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)

  3. Generate the template as instructed

You should see the template under the template_location directory now

Thanks!

查看更多
登录 后发表回答