Kafka JDBC Sink Connector: no tasks assigned

2020-07-22 09:50发布

问题:

I try to start JDBC sink connector with following configuration:

{
    "name": "crm_data-sink_hh",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 6,
        "topics": "crm_account,crm_competitor,crm_event,crm_event_participation",

        "connection.url": "jdbc:postgresql://db_host/hh?prepareThreshold=0",
        "connection.user": "db_user",
        "connection.password": "${file:db_hh_kafka_connect_pass}",
        "dialect.name": "PostgreSqlDatabaseDialect",

        "insert.mode": "upsert",
        "pk.mode": "record_value",
        "pk.fields": "guid",

        "errors.tolerance": "all",
        "errors.log.enable":true,
        "errors.log.include.messages":true,

        "errors.deadletterqueue.topic.name":"crm_data_deadletterqueue",
        "errors.deadletterqueue.context.headers.enable":true
    }
}

But no tasks are running while connector is in running state:

curl -X GET http://kafka-connect:10900/connectors/crm_data-sink_hh/status
{"name":"crm_data-sink_hh","connector":{"state":"RUNNING","worker_id":"172.16.24.14:10900"},"tasks":[],"type":"sink"}

I faced this issue many times, but I'm very confused because it happens randomly. My question is very similar with this question. I would appreciate any help!


Update. 11/04/2019 (unfortunately, now I have only INFO level log)

Finally, after few attempts I started connector with running tasks by updating config of the existing connector crm_data-sink_db_hh:

$ curl -X GET http://docker61:10900/connectors/crm_data-sink_db_hh/status
{"name":"crm_data-sink_db_hh","connector":{"state":"RUNNING","worker_id":"192.168.1.198:10900"},"tasks":[],"type":"sink"}

$ curl -X GET http://docker61:10900/connectors/crm_data-sink_db_hh/status
{"name":"crm_data-sink_db_hh","connector":{"state":"RUNNING","worker_id":"192.168.1.198:10900"},"tasks":[],"type":"sink"}

$ curl -X PUT -d @new_config.json http://docker21:10900/connectors/crm_data-sink_db_hh/config -H 'Content-Type: application/json'

$ curl -X GET http://docker61:10900/connectors/crm_data-sink_db_hh/status
{"name":"crm_data-sink_db_hh","connector":{"state":"UNASSIGNED","worker_id":"192.168.1.198:10900"},"tasks":[],"type":"sink"}

$ curl -X GET http://docker61:10900/connectors/crm_data-sink_db_hh/status
{"name":"crm_data-sink_db_hh","connector":{"state":"RUNNING","worker_id":"172.16.36.11:10900"},"tasks":[{"state":"UNASSIGNED","id":0,"worker_id":"172.16.32.11:10900"},{"state":"UNASSIGNED","id":1,"worker_id":"172.16.32.11:10900"},{"state":"RUNNING","id":2,"worker_id":"192.168.2.243:10900"},{"state":"UNASSIGNED","id":3,"worker_id":"172.16.32.11:10900"},{"state":"UNASSIGNED","id":4,"worker_id":"172.16.32.11:10900"}],"type":"sink"}

$ curl -X GET http://docker61:10900/connectors/crm_data-sink_db_hh/status
{"name":"crm_data-sink_db_hh","connector":{"state":"RUNNING","worker_id":"192.168.1.198:10900"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"192.168.1.198:10900"},{"state":"RUNNING","id":1,"worker_id":"192.168.1.198:10900"},{"state":"RUNNING","id":2,"worker_id":"192.168.1.198:10900"},{"state":"RUNNING","id":3,"worker_id":"192.168.1.198:10900"},{"state":"RUNNING","id":4,"worker_id":"192.168.1.198:10900"},{"state":"RUNNING","id":5,"worker_id":"192.168.1.198:10900"}],"type":"sink"}

Log:

[2019-04-11 16:02:15,167] INFO Connector crm_data-sink_db_hh config updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:15,668] INFO Rebalance started (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:15,668] INFO Stopping connector crm_data-source (org.apache.kafka.connect.runtime.Worker)
[2019-04-11 16:02:15,668] INFO Stopping task crm_data-source-0 (org.apache.kafka.connect.runtime.Worker)
[2019-04-11 16:02:15,668] INFO Stopping connector crm_data-sink_pandora (org.apache.kafka.connect.runtime.Worker)
[2019-04-11 16:02:15,668] INFO Stopping JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask)
[2019-04-11 16:02:15,668] INFO Stopping table monitoring thread (io.confluent.connect.jdbc.JdbcSourceConnector)
...
Stopping connectors and tasks 
...
[2019-04-11 16:02:17,373] INFO 192.168.1.91 - - [11/Apr/2019:13:02:14 +0000] "POST /connectors HTTP/1.1" 201 768  2468 (org.apache.kafka.connect.runtime.rest.RestServer)
[2019-04-11 16:02:20,668] ERROR Graceful stop of task crm_data-source-1 failed. (org.apache.kafka.connect.runtime.Worker)
[2019-04-11 16:02:20,669] ERROR Graceful stop of task crm_data-source-0 failed. (org.apache.kafka.connect.runtime.Worker)
[2019-04-11 16:02:20,669] ERROR Graceful stop of task crm_data-source-3 failed. (org.apache.kafka.connect.runtime.Worker)
[2019-04-11 16:02:20,669] ERROR Graceful stop of task crm_data-source-2 failed. (org.apache.kafka.connect.runtime.Worker)
[2019-04-11 16:02:20,669] INFO Finished stopping tasks in preparation for rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,669] INFO [Worker clientId=connect-1, groupId=21] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2019-04-11 16:02:20,681] INFO Tasks [crm_data-sink_hhru-0, crm_data-sink_hhru-3, crm_data-sink_hhru-4, crm_data-sink_hhru-1, crm_data-sink_hhru-2, crm_data-sink_hhru-5, crm_data-pandora_sink-0, crm_data-pandora_sink-2, crm_data-pandora_sink-1, crm_data-pandora_sink-4, crm_data-pandora_sink-3, crm_data-pandora_sink-03-0, crm_data-pandora_sink-03-2, crm_data-pandora_sink-03-1, crm_data-pandora_sink-00-1, crm_data-pandora_sink-00-0, crm_data-pandora_sink-00-3, crm_data-pandora_sink-00-2, crcrm_data-pandora_sink-00-4, crm_data-sink_hh-00-0, crm_data-sink_hh-00-1, crm_data-sink_hh-00-2, crm_data-pandora_sink-test-3, crm_data-pandora_sink-test-2, crm_data-pandora_sink-test-4,crm_data-pandora_sink-01-2, crm_data-pandora_sink-01-1, crm_data-pandora_sink-01-0, crm_data-source-3, crm_data-source-2, crm_data-source-1, crm_data-source-0, crm_data-sink_db_hh-0, crm_data-sink_db_hh-1, crm_data-sink_db_hh-2, crm_data-sink_hh-01-0, crm_data-sink_hh-01-1, crm_data-sink_hh-01-2, crm_data-sink_hh-01-3, crm_data-sink_hh-00-3, crm_data-sink_hh-00-4, crm_data-sink_hh-00-5, crm_data-sink_hh-1, crm_data-sink_hh-0, crm_data-sink_hh-3, crm_data-sink_hh-2, crm_data-sink_hh-5, crm_data-sink_hh-4, crm_data-sink_pandora-5, crm_data-sink_pandora-0, crm_data-sink_pandora-1, crm_data-sink_pandora-2, crm_data-sink_pandora-3, crm_data_account_on_competitors-source-0, crm_data-sink_pandora-4] configs updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,681] INFO Tasks [] configs updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,682] INFO Tasks [] configs updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,683] INFO Tasks [] configs updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,684] INFO Tasks [] configs updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,685] INFO [Worker clientId=connect-1, groupId=21] Successfully joined group with generation 2206465 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2019-04-11 16:02:20,685] INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-57140c1d-3b19-4fc0-b4ca-e6ce272e1924', leaderUrl='http://192.168.1.198:10900/', offset=1168, connectorIds=[crm_data-sink_db_hh, crm_data-source, crm_data-sink_pandora], taskIds=[crm_data-source-0, crm_data-source-1, crm_data-source-2, crm_data-source-3, crm_data-sink_pandora-0, crm_data-sink_pandora-1, crm_data-sink_pandora-2, crm_data-sink_pandora-3, crm_data-sink_pandora-4, crm_data-sink_pandora-5]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,685] INFO Starting connectors and tasks using config offset 1168 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,685] INFO Starting connector crm_data-sink_db_hh (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,685] INFO Starting connector crm_data-source (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,685] INFO Starting connector crm_data-sink_pandora (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,685] INFO Starting task crm_data-source-0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
...
Starting connectors and tasks
...

Update. 12/04/2019

I increased log level and reproduced the issue. I see a lot of records for different tasks (tasks of already deleted connectors or not yet running tasks) like this:

 [2019-04-12 15:14:32,360] DEBUG Storing new config for task crm_data-sink_hh-3 this will wait for a commit message before the new config will take effect. New config: {...} (org.apache.kafka.connect.storage.KafkaConfigBackingStore)

There are tasks of deleted connectors in the task list - is it ok? The same situation in the internal topics of Kafka Connect.

My main question: why the connector did not fail if there are no tasks are running for any reason? Since the connector does not work actually in this situation.

回答1:

It looks like a bug of Kafka Connect itself. There is a Kafka Jira ticket about this issue.