How to access pipeline options within Par.Do trans

2019-09-02 15:28发布

问题:

TL;DR: How can I access parameters passed to the job at job creation time in my Par.Do transforms?

I have two templates, one for Dev and one for Prod and they all work fine, except there is one value that needs to be different in each template. So far I've been "hardcoding" this value then I "run" java program to build the template (using the DataflowRunner runner). But this is error prone and if I'm not really really careful I will try to update some code in the dev template and inadvertently still have this value set from the prod template. Not good.

So, I thought the Pipeline Options would be good, I just pass in a different parameter, either at template compile time or even at template run time, but I am having a bear of a time accessing the value within the Par.Do transform where I need it. It works fine if I use the default runner and run the pipeline locally, but when I switch over and build the template, the value is always null. I can reproduce this with the following code:

/* 
imports...
*/

@SuppressWarnings("serial")
public class StarterPipeline {
  private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

  static String orgId;

  public interface MyOptions extends PipelineOptions {


     @Description("Org Id")
     @Default.String("123-984-a")
     String getOrgId();
     void setOrgId( String orgID );

  }

  public static void main(String[] args) {


     PipelineOptionsFactory.register(MyOptions.class);


     final MyOptions options = PipelineOptionsFactory.fromArgs( args ).withValidation().create()
        .as( MyOptions.class );


     orgId = options.getOrgId();

     LOG.info( "orgId: " + orgId );

     Pipeline p = Pipeline.create( options );


     PCollection<String> someDataRows = p.apply("Get data from BQ", Create.of(

      "string 1", "string2", "string 3"

     ) );


     someDataRows.apply( "Package into a list", ParDo.of( new DoFn<String, String>() {

           @ProcessElement
           public void processElement( ProcessContext c ) {

              LOG.info( "Hello? " );
              LOG.info( "ORG ID: " + orgId );
           }

           }));


    p.run();
  }
}

The output in the cloud is:

 2018-09-20 (16:16:49) Hello?
 2018-09-20 (16:16:49) ORG ID: null
 2018-09-20 (16:16:51) Hello?
 2018-09-20 (16:16:51) ORG ID: null
 2018-09-20 (16:16:53) Hello?
 2018-09-20 (16:16:53) ORG ID: null
 ...

But locally:

Sep 20, 2018 4:15:32 PM simplepipeline.StarterPipeline main
INFO: orgId: jomama47
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: Hello? 
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: ORG ID: jomama47
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: Hello? 
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: Hello? 
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: ORG ID: jomama47
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: ORG ID: jomama47

These are the build parameters I'm using for the template:

--project=the-project
--stagingLocation=gs://staging.the-project.appspot.com/staging/
--tempLocation=gs://staging.the-project.appspot.com/temp/
--runner=DataflowRunner
--region=us-west1
--templateLocation=gs://staging.the-project.appspot.com/templates/NoobPipelineDev
--orgId=jomama47

And for local:

--project=the-project
--tempLocation=gs://staging.the-project.appspot.com
--orgId=jomama47

I tried passing parameters to the job when I created the job in the Dataflow console (browser) in the parameters fiels as orgId and jomama77, but it still comes out as null.

Sorry for the long post.

回答1:

There are two things here. First of all, I would recommend using ValueProvider so that you can pass the parameter at runtime for different orgId:

public interface MyOptions extends PipelineOptions {    
     @Description("Org Id")
     @Default.String("123-984-a")
     ValueProvider<String> getOrgId();
     void setOrgId(ValueProvider<String> orgID);   
}

Then read it from options with:

ValueProvider<String> orgId = options.getOrgId();

For this to be accessible within the ParDo you can pass it as a parameter to the constructor such as the example in the docs:

someDataRows.apply( "Package into a list", ParDo.of( new CustomFn(orgId)));

where CustomFn's constructor takes it as an arguments and stores it in a ValueProvider so that it's accessible from within the ParDo. Notice that now you'll need to use orgId.get():

static class CustomFn extends DoFn<String, String> {
    // access options from wihtin the ParDo
    ValueProvider<String> orgId;
    public CustomFn(ValueProvider<String> orgId) {
        this.orgId = orgId;
    }

    @ProcessElement
    public void processElement( ProcessContext c ) {
      LOG.info( "Hello? " );
      LOG.info( "ORG ID: " + orgId.get() );
    }
}

Now you can stage the template and call it with:

gcloud dataflow jobs run $JOB_NAME \
    --gcs-location gs://$BUCKET/templates/$TEMPLATE_NAME \
    --parameters orgId=jomama47

This should work as expected: