Hello Internet Hive Mind!
I need to query AWS Athena with nifi, however I need to change the staging directory (the S3 bucket & folder where the results will be saved) for each query sent.
But the s3_staging_dir property has to be set on the DBCPConnectionPool Controller Service.
How can I change the value of that property for each different flow file?
Apparently it can't be fetched by expression language alone.
Thanks!
I'm not sure the nature of your flow where each query depends on a different staging directory, but there are a couple things to keep in mind.
- The
DBCPConnectionPool
controller service does allow dynamic properties which evaluate expression language, but that expression language evaluation is performed when the controller service is enabled, so "once" per start/stop.
- The dynamic properties on the controller service do not evaluate flowfile attributes.
From Apache NiFi DBCPConnectionPool
documentation:
Dynamic Properties:
Dynamic Properties allow the user to specify both the name and value
of a property.
...
Specifies a property name and value to be set on the JDBC
connection(s). If Expression Language is used, evaluation will be
performed upon the controller service being enabled. Note that no flow
file input (attributes, e.g.) is available for use in Expression
Language constructs for these properties. Supports Expression
Language: true
Because of your requirement that the S3 staging directory is different on every request, I think in this case, you would need to pursue one of the following options:
- File a Jira requesting native Athena support in NiFi (thoroughly explain why the existing
DBCPConnectionPool
doesn't support your use case)
- Extend the
DBCPConnectionPool
controller service with your own AthenaConnectionPool
controller service. There are many tutorials for building your own NiFi components, but the NiFi Developer Guide > Developing Controller Services is the best place to start. You can make a controller service which does evaluate incoming flowfile attributes when performing expression language execution, but you will need to manually trigger this, as controller services do not have an @OnTrigger
phase of their lifecycle. If you also write a custom processor, you can invoke some "re-evaluate" method in the controller service from the onTrigger()
method of the processor, but existing processors will not call this. Instead, you could theoretically put a high frequency refresher in the controller service itself using executors, but this will definitely affect performance
- Create multiple
DBCPConnectionPool
instances and SQL processors for each staging directory (feasible on the order of 1 - 3, otherwise abysmal)
- Use the
ExecuteStreamCommand
processor with awscli
to execute the queries using the command-line tool. This deprives you of the NiFi native SQL tools but allows custom queries on every invocation because ExecuteStreamCommand
can interpret the flowfile-specific attributes and use them in the query
- Re-evaluate your flow design and see if there is a way to perform the queries without allowing for arbitrary S3 staging directories on individual query execution
You don't have to set the property in DBCPConnectionPool
. The query that you set in the SQL processor will output the results from Athena as flowfiles. You can connect the SQL processor to a PutS3Object
and specify the bucket name and other necessary properties. This will write the result of your SQL query to the S3 staging directory.