I am facing issue while creating custom template for Cloud Dataflow. its simple code that takes data from input bucket and loads in BigQuery. We want to load many tables so trying to create custom template. once this works, next step would be passing dataset also as parameter.
Error message :
AttributeError: 'StaticValueProvider' object has no attribute 'datasetId'
Code
class ContactUploadOptions(PipelineOptions):
"""
Runtime Parameters given during template execution
path and organization parameters are necessary for execution of pipeline
campaign is optional for committing to bigquery
"""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
type=str,
help='Path of the file to read from'
)
parser.add_value_provider_argument(
'--output',
type=str,
help='Output BQ table for the pipeline')
def run(argv=None):
"""The main function which creates the pipeline and runs it."""
global PROJECT
from google.cloud import bigquery
# Retrieve project Id and append to PROJECT form GoogleCloudOptions
# Initialize runtime parameters as object
contact_options = PipelineOptions().view_as(ContactUploadOptions)
PROJECT = PipelineOptions().view_as(GoogleCloudOptions).project
client = bigquery.Client(project=PROJECT)
dataset = client.dataset('pharma')
data_ingestion = DataIngestion()
pipeline_options = PipelineOptions()
# Save main session state so pickled functions and classes
# defined in __main__ can be unpickled
pipeline_options.view_as(SetupOptions).save_main_session = True
# Parse arguments from command line.
#data_ingestion = DataIngestion()
# Instantiate pipeline
options = PipelineOptions()
p = beam.Pipeline(options=options)
(p
| 'Read from a File' >> beam.io.ReadFromText(contact_options.input, skip_header_lines=0)
| 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
contact_options.output,
schema='assetid:INTEGER,assetname:STRING,prodcd:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
)
My command is as below :
python3 -m pharm_template --runner DataflowRunner --project jupiter-120 --staging_location gs://input-cdc/temp/staging --temp_location gs://input-cdc/temp/ --template_location gs://code-cdc/temp/templates/jupiter_pipeline_template
What I tried :
I tried passing --input
and --output
I also tried --experiment=use_beam_bq_sink
but to no avail.
I also tried passing datasetID
datasetId = StaticValueProvider(str, 'pharma')
but no luck. If any one has created template that loads in BQ , then I can take cue and fix this issue.