NiFi 1.1.1 tested on both Windows 7 and RHEL 7.
The background thread is here.
I have created a DBCPConnectionPool controller service pointing to a SQL Server db, I am able to fetch data from a table and write it to the local disk(ExecuteSQL -> ConvertAvroToJSON -> PutFile).
My code:
public byte[] getMaxLSN(Connection connection, String containerDB) {
String dbMaxLSN = "{? = CALL sys.fn_cdc_get_max_lsn()}";
byte[] maxLSN = null;
try (final CallableStatement cstmt = connection.prepareCall(dbMaxLSN);) {
cstmt.registerOutParameter(1, java.sql.JDBCType.BINARY);
cstmt.execute();
if (cstmt.getBytes(1) == null || cstmt.getBytes(1).length <= 0) {
System.out.println("Coudln't retrieve the max lsn for the db "
+ containerDB);
} else {
maxLSN = cstmt.getBytes(1);
}
} catch (SQLException sqlException) {
System.out.println("sqlException !!!");
sqlException.printStackTrace();
}
return maxLSN;
}
The challenge arises when I use the pool as a property in my custom processor. In the processor's code, I need to invoke a function in the db but this lands into a SQLException pointing to the JDBC driver. Note that the same driver functions properly in a standalone Java code(provided in the background thread to avoid cluttering this post) and I get the return value from the function. I suspect that the Controller Service is not configured properly - it can execute select queries but when a code invokes a function, it throws an Exception. What am I missing ?
Process or SQL exception in <configure logger template to pick the code location>
2017-03-17 09:25:30,717 ERROR [Timer-Driven Process Thread-6] c.s.d.processors.SQLServerCDCProcessor
org.apache.nifi.processor.exception.ProcessException: Coudln't retrieve the max lsn for the db test
at com.datalake.processors.SQLServerCDCProcessor$SQLServerCDCUtils.getMaxLSN(SQLServerCDCProcessor.java:692) ~[nifi-NiFiCDCPoC-processors-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
at com.datalake.processors.SQLServerCDCProcessor.getChangedTableQueries(SQLServerCDCProcessor.java:602) ~[nifi-NiFiCDCPoC-processors-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
at com.datalake.processors.SQLServerCDCProcessor.onTrigger(SQLServerCDCProcessor.java:249) ~[nifi-NiFiCDCPoC-processors-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) [nifi-api-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1099) [nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.1.1.jar:1.1.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_71]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_71]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_71]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_71]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_71]
Caused by: java.sql.SQLFeatureNotSupportedException: registerOutParameter not implemented
at java.sql.CallableStatement.registerOutParameter(CallableStatement.java:2613) ~[na:1.8.0_71]
at com.datalake.processors.SQLServerCDCProcessor$SQLServerCDCUtils.getMaxLSN(SQLServerCDCProcessor.java:677) ~[nifi-NiFiCDCPoC-processors-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
... 14 common frames omitted