I am using the confluent community edition for a simple setup consisting a rest client calling the Kafka rest proxy and then pushing that data into an oracle database using the provided jdbc sink connector.
I noticed that if there is an sql exception for instance if the actual data's length is greater than the actual one (column's length defined), the task stopped and if I do restart it, same thing it tries to insert the erroneous entry and it stopped. It does not insert the other entries.
Is not a way I can log the erroneous entry and let the tasks continue inserting the other data?
Kafka Connect framework for Sink Connectors can only skip problematic records when exception is thrown during:
- Convertion key or values (Converter:toConnectData(...)
)
- Transformation (Transformation::apply
)
For that you can use errors.tolerance property:
"errors.tolerance": "all"
There are some additional properties, for printing details regarding errors: errors.log.enable
, errors.log.include.messages
.
Original answer: Apache Kafka JDBC Connector - SerializationException: Unknown magic byte
If an exception is thrown during delivering messages Sink Task is killed.
If you need to handle communication error (or others) with an external system, you have to add support to your connector
Jdbc Connector, when SQLException
is thrown makes retries but doesn't skip any records
Number of retries and interval between them is managed by the following properties
max.retries
default value 10
retry.backoff.ms
default 3000
The sink cannot currently ignore bad records, but you can manually skip them, using the kafka-consumer-groups
tool:
kafka-consumer-groups \
--bootstrap-server kafka:29092 \
--group connect-sink_postgres_foo_00 \
--reset-offsets \
--topic foo \
--to-offset 2 \
--execute
For more info see here.