As I have a working static Dataflow running, I'd like to create a template from this one to let me easily reuse the Dataflow without any command line typing.
Following the Creating Templates tutorial from the official doesn't provide a sample for templatable output.
My Dataflow ends with a BigQuery sink which takes a few arguments like the target table for storage. This exact parameter is the one I'd like to make available in my template allowing me to choose the target storage after running the flow.
But, I'm not able to get this working. Below I paste some code snippets which could help explaining the exact issue I have.
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
default='gs://my-source-bucket/file.json')
parser.add_value_provider_argument(
'--table',
default='my-project-id:some-dataset.some-table')
pipeline_options = PipelineOptions()
pipe = beam.Pipeline(options=pipeline_options)
custom_options = pipeline_options.view_as(CustomOptions)
(...)
# store
processed_pipe | beam.io.Write(BigQuerySink(
table=custom_options.table.get(),
schema='a_column:STRING,b_column:STRING,etc_column:STRING',
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND
))
When creating the template, I did not give any parameters with it. In a split second I get the following error message:
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: table, type: str, default_value: 'my-project-id:some-dataset.some-table').get() not called from a runtime context
When I add a --table
parameter at template creation, the template is being created but the --table
parameter value is then hardcoded in the template and not overridden by any given template value for table
later.
I get the same error when I replaced the table=custom_options.table.get(),
with table=StaticValueProvider(str, custom_options.table.get())
.
Is there someone who already built a templatable Dataflow with customisable BigQuerySink parameters? I'd love to get some hints on this.
Python currently only supports ValueProvider options for FileBasedSource IOs. You can see that by clicking on the Python tab at the link you mentioned: https://cloud.google.com/dataflow/docs/templates/creating-templates
under the "Pipeline I/O and runtime parameters" section.
Unlike what happens in Java, BigQuery in Python does not use a custom source. In other words, it is not fully implemented in the SDK but also contains parts in the backend (and it is therefore a "native source"). Only custom sources can use templates. There are plans to have BigQuery added as custom source: issues.apache.org/jira/browse/BEAM-1440