Kafka Connect implementation errors

2019-06-17 04:30发布

问题:

I was running through the tutorial here: http://kafka.apache.org/documentation.html#introduction

When I get to "Step 7: Use Kafka Connect to import/export data" and attempt to start two connectors I am getting the following errors:

ERROR Failed to flush WorkerSourceTask{id=local-file-source-0}, timed out while waiting for producer to flush outstanding messages, 1 left

ERROR Failed to commit offsets for WorkerSourceTask

Here is the portion of the tutorial:

Next, we'll start two connectors running in standalone mode, which means they run in a single, local, dedicated process. We provide three configuration files as parameters. The first is always the configuration for the Kafka Connect process, containing common configuration such as the Kafka brokers to connect to and the serialization format for data. The remaining configuration files each specify a connector to create. These files include a unique connector name, the connector class to instantiate, and any other configuration required by the connector. bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

I have spent some time looking for a solution, but was unable to find anything useful. Any help is appreciated.

Thanks!

回答1:

The reason I was getting this error was because the first server I created using the config/server.properties was not running. I am assuming that because it is the lead of the topic, the messages could not be flushed and the offsets could not be committed.

Once I started the kafka server using the server propertes (config/server.properties) This issue was resolved.



回答2:

You need to start Kafka server and Zookeeper before running Kafka Connect. You need to exec the cmds in "Step 2: Start the server" below first:

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

from here:https://mail-archives.apache.org/mod_mbox/kafka-users/201601.mbox/%3CCAK0BMEpgWmL93wgm2jVCKbUT5rAZiawzOroTFc_A6Q=GaXQgfQ@mail.gmail.com%3E



回答3:

You need to start zookeeper and kafka server first before running that line.

start zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

start multiple kafka servers

bin/kafka-server-start.sh config/server.properties
bin/kafka-server-start.sh config/server-1.properties
bin/kafka-server-start.sh config/server-2.properties

start connectors

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

Then you will see some lines are written into test.sink.txt:

foo
bar

And you can start the consumer to check it:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}


回答4:

If you configure your Kafka Broker with a hostname such as my.sandbox.com make sure that you modify the config/connect-standalone.properties accordingly:

bootstrap.servers=my.sandbox.com:9092

On Hortonworks HDP the default port is 6667, hence the setting is

bootstrap.servers=my.sandbox.com:6667

If Kerberos is enabled you will need the following settings as well (without SSL):

security.protocol=PLAINTEXTSASL
producer.security.protocol=PLAINTEXTSASL
producer.sasl.kerberos.service.name=kafka
consumer.security.protocol=PLAINTEXTSASL
consumer.sasl.kerberos.service.name=kafka