Attribute error while creating custom template usi

2020-05-07 07:45发布

问题:

I am facing issue while creating custom template for Cloud Dataflow. its simple code that takes data from input bucket and loads in BigQuery. We want to load many tables so trying to create custom template. once this works, next step would be passing dataset also as parameter.

Error message :

AttributeError: 'StaticValueProvider' object has no attribute 'datasetId'

Code

class ContactUploadOptions(PipelineOptions):
    """
    Runtime Parameters given during template execution
    path and organization parameters are necessary for execution of pipeline
    campaign is optional for committing to bigquery
    """

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--input',
            type=str,
            help='Path of the file to read from'
            )
        parser.add_value_provider_argument(
            '--output',
            type=str,
            help='Output BQ table for the pipeline')


def run(argv=None):
    """The main function which creates the pipeline and runs it."""

    global PROJECT
    from google.cloud import bigquery


    # Retrieve project Id and append to PROJECT form GoogleCloudOptions

    # Initialize runtime parameters as object
    contact_options = PipelineOptions().view_as(ContactUploadOptions)
    PROJECT = PipelineOptions().view_as(GoogleCloudOptions).project
    client = bigquery.Client(project=PROJECT)
    dataset = client.dataset('pharma')    
    data_ingestion = DataIngestion()
    pipeline_options = PipelineOptions()
    # Save main session state so pickled functions and classes
    # defined in __main__ can be unpickled
    pipeline_options.view_as(SetupOptions).save_main_session = True
    # Parse arguments from command line.
    #data_ingestion = DataIngestion()

    # Instantiate pipeline
    options = PipelineOptions()
    p = beam.Pipeline(options=options)
    (p
     | 'Read from a File' >> beam.io.ReadFromText(contact_options.input, skip_header_lines=0)
     | 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s))
     | 'Write to BigQuery' >> beam.io.Write(
                beam.io.BigQuerySink(
                    contact_options.output,
                    schema='assetid:INTEGER,assetname:STRING,prodcd:INTEGER',
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
     )

My command is as below :

python3 -m pharm_template --runner DataflowRunner  --project jupiter-120  --staging_location gs://input-cdc/temp/staging  --temp_location gs://input-cdc/temp/   --template_location gs://code-cdc/temp/templates/jupiter_pipeline_template

What I tried :

I tried passing --input and --output
I also tried --experiment=use_beam_bq_sink but to no avail. I also tried passing datasetID

datasetId = StaticValueProvider(str, 'pharma')

but no luck. If any one has created template that loads in BQ , then I can take cue and fix this issue.