Kafka Connect sink tasks ignore tolerance limits

2019-08-07 14:27发布

问题:

I try to ignore bad messages in sink connector with errors.tolerance: all option. Full connector configuration:

{
    "name": "crm_data-sink_pandora",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 6,
        "topics": "crm_account_detail,crm_account_on_competitors,crm_event,crm_event_participation",
        "connection.url": "jdbc:postgresql://dburl/service?prepareThreshold=0",
        "connection.user": "pandora.app",
        "connection.password": "*******",
        "dialect.name": "PostgreSqlDatabaseDialect",
        "insert.mode": "upsert",
        "pk.mode": "record_value",
        "pk.fields": "guid",
        "table.name.format": "pandora.${topic}",
        "errors.tolerance": "all",
        "errors.log.enable":true,
        "errors.log.include.messages":true,
     "errors.deadletterqueue.topic.name":"crm_data_deadletterqueue",
        "errors.deadletterqueue.context.headers.enable":true
    }
}

Target table DDL:

create table crm_event_participation
(
  guid              char(36) not null
    constraint crm_event_participation_pkey
      primary key,
  created_on        timestamp,
  created_by_guid   char(36),
  modified_on       timestamp,
  modified_by_guid  char(36),
  process_listeners integer,
  event_guid        char(36),
  event_response    varchar(250),
  note              varchar(500),
  is_from_group     boolean,
  contact_guid      char(36),
  target_item       integer,
  account_guid      char(36),
  employer_id       integer
);

Connector starts successfully, but it fails if error occurs (e.g. missing field).

curl -X GET http://kafka-connect:9092/connectors/crm_data-sink_pandora/status:

{
    "name": "crm_data-sink_pandora",
    "connector": {
        "state": "RUNNING",
        "worker_id": "192.168.2.254:10900"
    },
    "tasks": [
        {
            "state": "FAILED",
            "trace": 
              "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
                 at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
                 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
                 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
                 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
                 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
                 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
                 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                 at java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                 at java.lang.Thread.run(Thread.java:748)
              Caused by: org.apache.kafka.connect.errors.ConnectException: Table \"pandora\".\"crm_event_participation\" is missing fields ([SinkRecordField{schema=Schema{STRING}, name='event_id', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='event_response_guid', isPrimaryKey=false}]) and auto-evolution is disabled
                 at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:140)
                 at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:73)
                 at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:84)
                 at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65)
                 at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73)
                 at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
                 ... 10 more",
            "id": 0,
            "worker_id": "192.168.2.254:10900"
        }
        ...
    ]
}

Log with exception:

[2019-03-29 16:59:30,924] INFO Unable to find fields [SinkRecordField{schema=Schema{INT32}, name='process_listners', isPrimaryKey=false}] among column names [employer_id, modified_on, modified_by_guid, contact_guid, target_item, guid, created_on, process_listeners, event_guid, created_by_guid, is_from_group, account_guid, event_response, note] (io.confluent.connect.jdbc.sink.DbStructure)
[2019-03-29 16:59:30,924] ERROR WorkerSinkTask{id=crm_data-sink_pandora-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: Table "pandora"."crm_event_participation" is missing fields ([SinkRecordField{schema=Schema{INT32}, name='process_listners', isPrimaryKey=false}]) and auto-evolution is disabled at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:140)
  at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:73)
  at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:84)
  at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65)
  at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

Please explain me what could be wrong in connector configuration? I use Kafka 2.0.0 and JdbcSinkConnector 5.1.0.

回答1:

In your Kafka message you have a field process_listners. Column with that name is not present in your table.

I think you have typo. In table you have column process_listeners, not process_listners.

errors.tolerance property apply only to errors during Converting messages. More regarding errors.tolerance you can read: kafka connect - jdbc sink sql exception