TL;DR: How can I access parameters passed to the job at job creation time in my Par.Do transforms?
I have two templates, one for Dev and one for Prod and they all work fine, except there is one value that needs to be different in each template. So far I've been "hardcoding" this value then I "run" java program to build the template (using the DataflowRunner runner). But this is error prone and if I'm not really really careful I will try to update some code in the dev template and inadvertently still have this value set from the prod template. Not good.
So, I thought the Pipeline Options would be good, I just pass in a different parameter, either at template compile time or even at template run time, but I am having a bear of a time accessing the value within the Par.Do transform where I need it.
It works fine if I use the default runner and run the pipeline locally, but when I switch over and build the template, the value is always null
. I can reproduce this with the following code:
/*
imports...
*/
@SuppressWarnings("serial")
public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
static String orgId;
public interface MyOptions extends PipelineOptions {
@Description("Org Id")
@Default.String("123-984-a")
String getOrgId();
void setOrgId( String orgID );
}
public static void main(String[] args) {
PipelineOptionsFactory.register(MyOptions.class);
final MyOptions options = PipelineOptionsFactory.fromArgs( args ).withValidation().create()
.as( MyOptions.class );
orgId = options.getOrgId();
LOG.info( "orgId: " + orgId );
Pipeline p = Pipeline.create( options );
PCollection<String> someDataRows = p.apply("Get data from BQ", Create.of(
"string 1", "string2", "string 3"
) );
someDataRows.apply( "Package into a list", ParDo.of( new DoFn<String, String>() {
@ProcessElement
public void processElement( ProcessContext c ) {
LOG.info( "Hello? " );
LOG.info( "ORG ID: " + orgId );
}
}));
p.run();
}
}
The output in the cloud is:
2018-09-20 (16:16:49) Hello?
2018-09-20 (16:16:49) ORG ID: null
2018-09-20 (16:16:51) Hello?
2018-09-20 (16:16:51) ORG ID: null
2018-09-20 (16:16:53) Hello?
2018-09-20 (16:16:53) ORG ID: null
...
But locally:
Sep 20, 2018 4:15:32 PM simplepipeline.StarterPipeline main
INFO: orgId: jomama47
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: Hello?
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: ORG ID: jomama47
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: Hello?
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: Hello?
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: ORG ID: jomama47
Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
INFO: ORG ID: jomama47
These are the build parameters I'm using for the template:
--project=the-project
--stagingLocation=gs://staging.the-project.appspot.com/staging/
--tempLocation=gs://staging.the-project.appspot.com/temp/
--runner=DataflowRunner
--region=us-west1
--templateLocation=gs://staging.the-project.appspot.com/templates/NoobPipelineDev
--orgId=jomama47
And for local:
--project=the-project
--tempLocation=gs://staging.the-project.appspot.com
--orgId=jomama47
I tried passing parameters to the job when I created the job in the Dataflow console (browser) in the parameters fiels as orgId
and jomama77
, but it still comes out as null.
Sorry for the long post.
There are two things here. First of all, I would recommend using
ValueProvider
so that you can pass the parameter at runtime for differentorgId
:Then read it from options with:
For this to be accessible within the ParDo you can pass it as a parameter to the constructor such as the example in the docs:
where
CustomFn
's constructor takes it as an arguments and stores it in aValueProvider
so that it's accessible from within the ParDo. Notice that now you'll need to useorgId.get()
:Now you can stage the template and call it with:
This should work as expected: