DBCPConnectionPool controller service for SQL Serv

2019-05-07 17:59发布

问题:

NiFi 1.1.1 tested on both Windows 7 and RHEL 7.

The background thread is here.

I have created a DBCPConnectionPool controller service pointing to a SQL Server db, I am able to fetch data from a table and write it to the local disk(ExecuteSQL -> ConvertAvroToJSON -> PutFile).

My code:

public byte[] getMaxLSN(Connection connection, String containerDB) {
    String dbMaxLSN = "{? = CALL sys.fn_cdc_get_max_lsn()}";
    byte[] maxLSN = null;

    try (final CallableStatement cstmt = connection.prepareCall(dbMaxLSN);) {

        cstmt.registerOutParameter(1, java.sql.JDBCType.BINARY);
        cstmt.execute();

        if (cstmt.getBytes(1) == null || cstmt.getBytes(1).length <= 0) {

            System.out.println("Coudln't retrieve the max lsn for the db "
                    + containerDB);

        } else {
            maxLSN = cstmt.getBytes(1);

        }
    } catch (SQLException sqlException) {
        System.out.println("sqlException !!!");
        sqlException.printStackTrace();
    }

    return maxLSN;
}

The challenge arises when I use the pool as a property in my custom processor. In the processor's code, I need to invoke a function in the db but this lands into a SQLException pointing to the JDBC driver. Note that the same driver functions properly in a standalone Java code(provided in the background thread to avoid cluttering this post) and I get the return value from the function. I suspect that the Controller Service is not configured properly - it can execute select queries but when a code invokes a function, it throws an Exception. What am I missing ?

Process or SQL exception in <configure logger template to pick the code location>
2017-03-17 09:25:30,717 ERROR [Timer-Driven Process Thread-6] c.s.d.processors.SQLServerCDCProcessor 
org.apache.nifi.processor.exception.ProcessException: Coudln't retrieve the max lsn for the db test
    at com.datalake.processors.SQLServerCDCProcessor$SQLServerCDCUtils.getMaxLSN(SQLServerCDCProcessor.java:692) ~[nifi-NiFiCDCPoC-processors-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    at com.datalake.processors.SQLServerCDCProcessor.getChangedTableQueries(SQLServerCDCProcessor.java:602) ~[nifi-NiFiCDCPoC-processors-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    at com.datalake.processors.SQLServerCDCProcessor.onTrigger(SQLServerCDCProcessor.java:249) ~[nifi-NiFiCDCPoC-processors-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) [nifi-api-1.1.1.jar:1.1.1]
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1099) [nifi-framework-core-1.1.1.jar:1.1.1]
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.1.1.jar:1.1.1]
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.1.1.jar:1.1.1]
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.1.1.jar:1.1.1]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_71]
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_71]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_71]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_71]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_71]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_71]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_71]
Caused by: java.sql.SQLFeatureNotSupportedException: registerOutParameter not implemented
    at java.sql.CallableStatement.registerOutParameter(CallableStatement.java:2613) ~[na:1.8.0_71]
    at com.datalake.processors.SQLServerCDCProcessor$SQLServerCDCUtils.getMaxLSN(SQLServerCDCProcessor.java:677) ~[nifi-NiFiCDCPoC-processors-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
    ... 14 common frames omitted

回答1:

"java.sql.SQLFeatureNotSupportedException: registerOutParameter not implemented" Your code is using a feature not available in the driver.

Specifically, based on the stacktrace, your driver is calling registerOutParameter(int parameterIndex, SQLType sqlType), which was introduced in JDBC 4.2 (Java 8). This method has the following default implementation in the java.sql.CallableStatement interface:

default void registerOutParameter(int parameterIndex, SQLType sqlType)
    throws SQLException {
    throw new SQLFeatureNotSupportedException("registerOutParameter not implemented");
}

Where the throw is line 2613 in java.sql.CallableStatement, which matches the stacktrace.

As this method is implemented in the latest version of the Microsoft SQL Server JDBC driver, you are either using an older version, or the objects of the driver are wrapped in a proxy that doesn't support JDBC 4.2.

I suggest you upgrade to the latest version of the driver, currently v6.1.0. As another question of yours suggests you use maven, then you should should ensure the new Maven coordinates for the driver (they changed it when they open-sourced the driver):

<dependency>
    <groupId>com.microsoft.sqlserver</groupId>
    <artifactId>mssql-jdbc</artifactId>
    <version>6.1.0.jre8</version>
</dependency>

If the problem persist, then as com.datalake.processors.SQLServerCDCProcessor is your own code (as shown in your other question) , you should just change it to not call registerOutParameter(int parameterIndex, SQLType sqlType), but the older registerOutParameter(int parameterIndex, int sqlType).