kafka-connect : Getting error in distributed confi

2019-08-22 07:24发布

I get task error for a distributed configuration for a connector sink cassandra. I was running the command :

curl -s localhost:8083/connectors/cassandraSinkConnector2/status | jq

to get the status

{
  "name": "cassandraSinkConnector2",
  "connector": {
    "state": "RUNNING",
    "worker_id": localhost:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "localhost:8083",
      "trace": "org.apache.kafka.common.KafkaException: Failed to construct kafka consumer\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:605)\n\tat org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:505)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:441)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:865)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:110)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:880)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:876)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.common.KafkaException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor ClassNotFoundException exception occurred\n\tat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:357)\n\tat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:332)\n\tat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:319)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:701)\n\t... 12 more\nCaused by: java.lang.ClassNotFoundException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:424)\n\tat org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:357)\n\tat java.lang.Class.forName0(Native Method)\n\tat java.lang.Class.forName(Class.java:348)\n\tat org.apache.kafka.common.utils.Utils.loadClass(Utils.java:338)\n\tat org.apache.kafka.common.utils.Utils.newInstance(Utils.java:327)\n\tat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:355)\n\t... 15 more\n"
    }
  ],
  "type": "sink"

Stack trace:

"trace": "org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:605)
    at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:505)
    at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:441)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:865)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:110)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:880)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:876)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor ClassNotFoundException exception occurred
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:357)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:332)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:319)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:701)
    ... 12 more
Caused by: java.lang.ClassNotFoundException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.kafka.common.utils.Utils.loadClass(Utils.java:338)
    at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:327)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:355)
    ... 15 more

You can find below the configuration of the connector.


{
  "name": "cassandraSinkConnector2",
  "config": {
    "connector.class": "io.confluent.connect.cassandra.CassandraSinkConnector",
    "tasks.max": "1",
    "topics": "appartenance_de",
    "cassandra.contact.points": "localhost",
    "cassandra.kcql": "INSERT INTO app_test SELECT * FROM app_de",
    "cassandra.port": "9042",
    "cassandra.keyspace": "dev_dkks",
    "cassandra.username": "superuser",
    "cassandra.password": "password",
    "cassandra.write.mode": "insert",
    "value.converter.schemas.enable": "true",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "name": "cassandraSinkConnector2"
  },
  "tasks": [
    {
      "connector": "cassandraSinkConnector2",
      "task": 0
    }
  ],
  "type": "sink"
}

New error:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Record with a null key was encountered.  This connector requires that records from Kafka contain the keys for the Cassandra table. Please use a transformation like org.apache.kafka.connect.transforms.ValueToKey to create a key with the proper fields.
    at io.confluent.connect.cassandra.CassandraSinkTask.put(CassandraSinkTask.java:86)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
    ... 10 more
"

1条回答
等我变得足够好
2楼-- · 2019-08-22 07:54

The root error is

java.lang.ClassNotFoundException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor

The Monitoring Interceptors are part of Confluent Platform. You can either disable their use in your Kafka Connect worker config, or better, make sure that the /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.1.jar JAR is available to your Kafka Connect worker.


The new error you're seeing is

org.apache.kafka.connect.errors.DataException: 
Record with a null key was encountered.  This connector requires that records from Kafka contain the keys for the Cassandra table. 
Please use a transformation like org.apache.kafka.connect.transforms.ValueToKey to create a key with the proper fields.

I'd suggest using a Single Message Transform as suggested in the error to correctly key your data. You can see an example of doing this here and the documentation for the transform here.

查看更多
登录 后发表回答