How to import MS Sql Server tables to KSQL with Ka

2019-02-20 04:53发布

Hi I am trying to import all tables present on remote SQL Server to KSQL topics this is my file properties

connector.class=io.confluent.connect.cdc.mssql.MsSqlSourceConnector
name=sqlservertest
tasks.max=1
initial.database=$$DATABASE
connection.url=jdbc:sqlserver://$$IP:1433;databaseName=$$DATABASE;user=$$USER;
username=$$USER
password=$$PASS
server.name=$$IP
server.port=1433
topic.prefix=sqlservertest
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
mode=bulk
auto.create=true
auto.evolve=true

than i do

confluent load sqlservertest -d /opt/kakfkaconf/sqlservertest.properties

and in the log

confluent log connect -f

it shows

[2018-10-10 14:18:43,856] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:868)

it run correctly but it doesn't import anything, the topic remain empty

confluent status sqlservertest 
{
  "name": "sqlservertest",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.132.0.2:8083"
  },
  "tasks": [],
  "type": "source"
}

I have chenaged also the properties

name=mssql
connector.class=io.confluent.connect.cdc.mssql.MsSqlSourceConnector
tasks.max=2
initial.database=$$DB
username=$$USER
password=$$PASS
server.name=$$IP
server.port=1433
change.tracking.tables=$$SCHEMA.$$TABLE
auto.create=true
auto.evolve=true
topic.prefix=$$DB
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

but i am getting this error

[2018-10-10 15:06:09,216] ERROR Exception thrown while querying for ChangeKey{databaseName=$$DB, schemaName=$$SCHEMA, tableName=$$TABLE} (io.confluent.connect.cdc.mssql.QueryService:94)
org.apache.kafka.connect.errors.DataException: Exception thrown while getting metadata for ChangeKey{databaseName=$$DB, schemaName=$$SCHEMA, tableName=$$TABLE}
        at io.confluent.connect.cdc.CachingTableMetadataProvider.tableMetadata(CachingTableMetadataProvider.java:64)
        at io.confluent.connect.cdc.mssql.QueryService.queryTable(QueryService.java:108)
        at io.confluent.connect.cdc.mssql.QueryService.processTables(QueryService.java:92)
        at io.confluent.connect.cdc.mssql.QueryService.run(QueryService.java:67)
        at com.google.common.util.concurrent.AbstractExecutionThreadService$1$2.run(AbstractExecutionThreadService.java:60)
        at com.google.common.util.concurrent.Callables$3.run(Callables.java:95)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near '='.
        at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
        ... 6 more
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near '='.
        at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:259)
        at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1547)
        ... 11 more

1条回答
放我归山
2楼-- · 2019-02-20 05:34

I found the real cause of this error, Kafka connector are using the functions that is present just in MS Sql server 2012, in particular IFF and boolan comparison in function

select IFF(1>2,'OK','KO');
select (1>2) as bool;

that are NOT working on MS Sql 2008

The real cause is that Conflunet MSSQL Connector is made just for MS SQL Server 2012 and above and I am runnning version 2008

I decompiled the library kafka-connect-cdc-mssql and adjusted the sql code to be compliant with sqlserver 2008 and now it's working.

Maybe I will push it to github to make it available for everybody

查看更多
登录 后发表回答