Kafka JDBC Connect query causes ORA-00933: SQL com

2019-06-02 22:25发布

I have this Oracle SQL query:

SELECT * FROM 
    (SELECT SO_ORDER_KEY,QUEUE_TYPE,SYS_NO,
    DENSE_RANK() OVER (PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) ORDER_RANK 
    FROM TSY940) 
WHERE ORDER_RANK=1;

When running in SQL developer, it returns the desired result.

For some reason when I use this query in the kafka-connect-jdbc properties I get

ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='null', query='SELECT * FROM (SELECT SO_ORDER_KEY,QUEUE_TYPE,SYS_NO,DENSE_RANK() OVER (PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) ORDER_RANK FROM TSY940) WHERE ORDER_RANK=1', topicPrefix='TSY940', timestampColumn='SYS_NO', incrementingColumn='null'}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:247)
java.sql.SQLSyntaxErrorException: ORA-00933: SQL command not properly ended

        at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:450)
        at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:399)
        at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1059)
        at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:522)
        at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:257)
        at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:587)
        at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:225)
        at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:53)
        at oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:774)
        at oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:925)
        at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1111)
        at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:4798)
        at oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:4845)
        at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java:1501)
        at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:201)
        at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
        at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
        at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Here is my properties file:

name=poc-oracle-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.password = ********
connection.url = jdbc:oracle:thin:@***.***.***.**:****/******
connection.user = ***********
table.types=TABLE
query=SELECT * FROM (SELECT SO_ORDER_KEY,QUEUE_TYPE,SYS_NO,DENSE_RANK() OVER (PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) ORDER_RANK FROM TSY940) WHERE ORDER_RANK=1
mode=timestamp
timestamp.column.name=SYS_NO
topic.prefix=TSY940
batch.max.rows = 500
poll.interval.ms=60000

transforms=createKey,extract
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=SO_ORDER_KEY
transforms.extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extract.field=SO_ORDER_KEY

I use the ojdbc7 driver.

The WHERE clause seams to be the issue because I don't get the exception when I replace the query property with

query=SELECT * FROM (SELECT SO_ORDER_KEY,QUEUE_TYPE,SYS_NO,DENSE_RANK() OVER (PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) ORDER_RANK FROM TSY940)

3条回答
爱情/是我丢掉的垃圾
2楼-- · 2019-06-02 22:34

You may then try this query which completely eliminates the ORDER_RANK

SELECT SO_ORDER_KEY, QUEUE_TYPE, SYS_NO
FROM (
 SELECT SO_ORDER_KEY, QUEUE_TYPE, SYS_NO 
 FROM (SELECT SO_ORDER_KEY, QUEUE_TYPE, SYS_NO,
      DENSE_RANK() OVER(PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) AS ORDER_RANK 
      FROM TSY940) sub
 WHERE sub.ORDER_RANK=1
)

The best way to see what is causing the problem is to enable 10046 trace and see the exact query that is send to database and that is causing the ORA-00933.

查看更多
太酷不给撩
3楼-- · 2019-06-02 22:34

You could try to add aliases and remove TO_CHAR:

SELECT SO_ORDER_KEY, QUEUE_TYPE, SYS_NO 
FROM (SELECT SO_ORDER_KEY, QUEUE_TYPE, SYS_NO,
      DENSE_RANK() OVER(PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) AS ORDER_RANK 
      FROM TSY940) sub
WHERE sub.ORDER_RANK=1;
查看更多
在下西门庆
4楼-- · 2019-06-02 22:41

I know this question has an accepted answer, but the problem you were having (and why wrapping the query in another query solved the issue) is that when a Connect job is running in timestamp mode, Connect appends a clause to the query using the specified timestamp field. What that means is that Connect was passing this invalid query to Oracle:

SELECT *
    FROM (SELECT
            SO_ORDER_KEY,
            QUEUE_TYPE,
            SYS_NO,
            DENSE_RANK()
            OVER (
              PARTITION BY SO_ORDER_KEY
              ORDER BY SYS_NO DESC ) ORDER_RANK
          FROM TSY940)
    WHERE ORDER_RANK = 1
    WHERE SYS_NO >= {last_run_timestamp}

See the last line? That was being added by Connect. When you wrap your query in another SELECT, that appended clause no longer causes a problem.

When I first had this problem, I found no mention of it anywhere in the documentation, and it was pretty irritating.

查看更多
登录 后发表回答