kafka connect - jdbc sink sql exception

2020-04-14 08:02发布

问题:

I am using the confluent community edition for a simple setup consisting a rest client calling the Kafka rest proxy and then pushing that data into an oracle database using the provided jdbc sink connector.

I noticed that if there is an sql exception for instance if the actual data's length is greater than the actual one (column's length defined), the task stopped and if I do restart it, same thing it tries to insert the erroneous entry and it stopped. It does not insert the other entries.

Is not a way I can log the erroneous entry and let the tasks continue inserting the other data?

回答1:

Kafka Connect framework for Sink Connectors can only skip problematic records when exception is thrown during: - Convertion key or values (Converter:toConnectData(...)) - Transformation (Transformation::apply)

For that you can use errors.tolerance property:

"errors.tolerance": "all"

There are some additional properties, for printing details regarding errors: errors.log.enable, errors.log.include.messages. Original answer: Apache Kafka JDBC Connector - SerializationException: Unknown magic byte

If an exception is thrown during delivering messages Sink Task is killed. If you need to handle communication error (or others) with an external system, you have to add support to your connector

Jdbc Connector, when SQLException is thrown makes retries but doesn't skip any records

Number of retries and interval between them is managed by the following properties

  • max.retries default value 10
  • retry.backoff.ms default 3000


回答2:

The sink cannot currently ignore bad records, but you can manually skip them, using the kafka-consumer-groups tool:

kafka-consumer-groups \
    --bootstrap-server kafka:29092 \
    --group connect-sink_postgres_foo_00 \
    --reset-offsets \
    --topic foo \
    --to-offset 2 \
    --execute

For more info see here.