I am trying to use Kafka Connect JDBC sink connector to insert data into Oracle but it is throwing an error . I have tried with all the possible configurations of the schema. Below is the examples .
Please suggest if I am missing anything below are my configurations files and errors.
Case 1- First Configuration
internal.value.converter.schemas.enable=false .
so I am getting the
[2017-08-28 16:16:26,119] INFO Sink task WorkerSinkTask{id=oracle_sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:233)
[2017-08-28 16:16:26,606] INFO Discovered coordinator dfw-appblx097-01.prod.walmart.com:9092 (id: 2147483647 rack: null) for group connect-oracle_sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:597)
[2017-08-28 16:16:26,608] INFO Revoking previously assigned partitions [] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)
[2017-08-28 16:16:26,609] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)
[2017-08-28 16:16:27,174] INFO Successfully joined group connect-oracle_sink with generation 26 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)
[2017-08-28 16:16:27,176] INFO Setting newly assigned partitions [DJ-7, DJ-6, DJ-5, DJ-4, DJ-3, DJ-2, DJ-1, DJ-0, DJ-9, DJ-8] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)
[2017-08-28 16:16:28,580] ERROR Task oracle_sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)
org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: DJ
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:190)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:58)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:65)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2nd Configuration -
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
Log:
[2017-08-28 16:23:50,993] INFO Revoking previously assigned partitions [] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)
[2017-08-28 16:23:50,993] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)
[2017-08-28 16:23:51,260] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)
[2017-08-28 16:23:51,381] INFO Successfully joined group connect-oracle_sink with generation 29 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)
[2017-08-28 16:23:51,384] INFO Setting newly assigned partitions [DJ-7, DJ-6, DJ-5, DJ-4, DJ-3, DJ-2, DJ-1, DJ-0, DJ-9, DJ-8] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)
[2017-08-28 16:23:51,727] ERROR Task oracle_sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)
Oracle connector.properties looks like
name=oracle_sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=DJ
connection.url=jdbc:oracle:thin:@hostname:port:sid
connection.user=username
connection.password=password
#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter
auto.create=true
auto.evolve=true
Connect-Standalone.properties
My JSON looks like -
{"Item":"12","Sourcing Reason":"corr","Postal Code":"l45","OrderNum":"10023","Intended Node Distance":1125.8,"Chosen Node":"34556","Quantity":1,"Order Date":1503808765201,"Intended Node":"001","Chosen Node Distance":315.8,"Sourcing Logic":"reducesplits"}
Per the documentation
So if your data is JSON you would have the following configuration:
The error you see in the second instance is pertinent --
JsonConverter with schemas.enable requires "schema" and "payload" fields
- the JSON you share does not meet this required format.Here's a simple example of a valid JSON message with
schema
andpayload
:What's your source for the data that you're trying to land to Oracle? If it's Kafka Connect inbound then you simply use the same
converter
configuration (Avro + Confluent Schema Registry) would be easier and more efficient. If it's a custom application, you'll need to get it to either (a) use the Confluent Avro serialiser or (b) write the JSON in the required format above, providing the schema of the payload inline with the message.