How can I pass a flowfile attribute to a controlle

2019-05-23 01:40发布

问题:

Hello Internet Hive Mind!

I need to query AWS Athena with nifi, however I need to change the staging directory (the S3 bucket & folder where the results will be saved) for each query sent.

But the s3_staging_dir property has to be set on the DBCPConnectionPool Controller Service. How can I change the value of that property for each different flow file? Apparently it can't be fetched by expression language alone.

Thanks!

回答1:

I'm not sure the nature of your flow where each query depends on a different staging directory, but there are a couple things to keep in mind.

  1. The DBCPConnectionPool controller service does allow dynamic properties which evaluate expression language, but that expression language evaluation is performed when the controller service is enabled, so "once" per start/stop.
  2. The dynamic properties on the controller service do not evaluate flowfile attributes.

From Apache NiFi DBCPConnectionPool documentation:

Dynamic Properties:

Dynamic Properties allow the user to specify both the name and value of a property.

...

Specifies a property name and value to be set on the JDBC connection(s). If Expression Language is used, evaluation will be performed upon the controller service being enabled. Note that no flow file input (attributes, e.g.) is available for use in Expression Language constructs for these properties. Supports Expression Language: true

Because of your requirement that the S3 staging directory is different on every request, I think in this case, you would need to pursue one of the following options:

  1. File a Jira requesting native Athena support in NiFi (thoroughly explain why the existing DBCPConnectionPool doesn't support your use case)
  2. Extend the DBCPConnectionPool controller service with your own AthenaConnectionPool controller service. There are many tutorials for building your own NiFi components, but the NiFi Developer Guide > Developing Controller Services is the best place to start. You can make a controller service which does evaluate incoming flowfile attributes when performing expression language execution, but you will need to manually trigger this, as controller services do not have an @OnTrigger phase of their lifecycle. If you also write a custom processor, you can invoke some "re-evaluate" method in the controller service from the onTrigger() method of the processor, but existing processors will not call this. Instead, you could theoretically put a high frequency refresher in the controller service itself using executors, but this will definitely affect performance
  3. Create multiple DBCPConnectionPool instances and SQL processors for each staging directory (feasible on the order of 1 - 3, otherwise abysmal)
  4. Use the ExecuteStreamCommand processor with awscli to execute the queries using the command-line tool. This deprives you of the NiFi native SQL tools but allows custom queries on every invocation because ExecuteStreamCommand can interpret the flowfile-specific attributes and use them in the query
  5. Re-evaluate your flow design and see if there is a way to perform the queries without allowing for arbitrary S3 staging directories on individual query execution


回答2:

You don't have to set the property in DBCPConnectionPool. The query that you set in the SQL processor will output the results from Athena as flowfiles. You can connect the SQL processor to a PutS3Object and specify the bucket name and other necessary properties. This will write the result of your SQL query to the S3 staging directory.



标签: apache-nifi