Apache Kafka example error: Failed to send message

2019-04-04 03:47发布

I am running this kafka producer example mentioned in its site

The code:

public class TestProducer {

    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();
        Properties props = new Properties();
        props.put("metadata.broker.list", "host.broker-1:9093, host.broker-2:9093, host.broker-3:9095");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "test.app.SimplePartitioner");
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        for (long nEvents = 0; nEvents < events; nEvents++) { 
               long runtime = new Date().getTime();  
               String ip = "192.168.2." + rnd.nextInt(255); 
               String msg = runtime + ",www.example.com," + ip; 
               KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
               producer.send(data);
        }
        producer.close();
    }
}

public class SimplePartitioner implements Partitioner{
    public SimplePartitioner (VerifiableProperties props) {

    }

    public int partition(Object key, int a_numPartitions) {
        int partition = 0;
        String stringKey = (String) key;
        int offset = stringKey.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  }

}

More details:

I am running this application on a host(call is producer) which is remote to host-broker[1-3]

  • I can ping and ssh the broker host from producer host.

  • Provided the advertised.host.name in the server.properties (they are named as server[1-3].properties in the brokers respectively

The properties:

broker.id=1
port=9093
host.name=host.broker.internal.name
advertised.host.name=host-broker1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/1/kafka-logs-1,/data/2/kafka-logs-2
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
zookeeper.connection.timeout.ms=6000

Any idea on how to fix this error?

8条回答
神经病院院长
2楼-- · 2019-04-04 03:59

I got these errors when running a Kafka producer:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

Found a solution:

On my Mac box, after I download the scala-2.10 and kafka_2.10-0.8.1, in the kafka_2.10-0.8.1 directory, every thing is fine when I start zookeeper, kafka server, and create a test topic. Then I need to start a producer for the test topic. but there is an error:

yhuangMac:kafka_2.10-0.8.1 yhuang$ ./bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

The reason is that in the kafka libs directory, the kafka release zip file only included jar file of slf4j-api, they missed a jar file: slf4j-nop.jar, so we have to go to http://www.slf4j.org, download slf4j-1.7.7.zip, and then unzip it, copy the slf4j-api-1.7.7, slf4j-nop-1.7.7.jar into kafka’s libs directory.

Restart kafka producer again, now no error is reported.

Source: SOLUTION

查看更多
Ridiculous、
3楼-- · 2019-04-04 04:03

This can happen if the client cannot reach BOTH hostname and IP of the kafka broker.

Make an entry to the clients \etc\hosts or C:\Windows\System32\drivers\etc\hosts and that resolved this issue for me.

查看更多
一纸荒年 Trace。
4楼-- · 2019-04-04 04:04

You need to add the SLF4j logging implementation. if you are using maven as the build tool try adding this following to your pom.xml and see if it works ..

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.5</version>
    </dependency>
查看更多
爷的心禁止访问
5楼-- · 2019-04-04 04:13

I got the error from apache kafka:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details

My Setup:

OS: Ubuntu 14.04
sbt: sbt launcher version 0.13.5
scala: Scala code runner version 2.9.2

Was able to fix it with these commands:

cd /home/el/apachekafka/kafka_2.10-0.8.1.1/libs
wget http://www.slf4j.org/dist/slf4j-1.7.7.tar.gz
tar -xvf slf4j-1.7.7.tar.gz
cd /home/el/apachekafka/kafka_2.10-0.8.1.1/libs/slf4j-1.7.7
cp slf4j-api-1.7.7.jar ..
cp slf4j-nop-1.7.7.jar ..

Then re-run the command and the producer doesn't throw any error.

查看更多
Anthone
6楼-- · 2019-04-04 04:16

ForHDP kafka use broker port: 6667

For Standalone kafka use broker port: 9092

The error was because of the port no which we were using (HDP uses 6667 but we were using 9092)

bin/kafka-console-producer.sh --broker-list broker-ip:9092 --topic test //not working

bin/kafka-console-producer.sh --broker-list broker-ip:6667 --topic test //working

link: Kafka console producer Error in Hortonworks HDP 2.3 Sandbox

查看更多
Animai°情兽
7楼-- · 2019-04-04 04:19

This is a solution to the exception in the original question asked by Krish: "kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries."

The FAQ here and here says that your hostname should be set correctly. I have not experienced that condition. But I found another condition when a kafka producer gives this error message: when your partition key in the producer is wrong. That is, if you have a topic with one partition, then partition key in the producer can be either null(message is sent to a random partition) or 0(partitions in kafka are numbered starting from 0). If you try to use a partition key of 1, this exception is thrown in the producer. Or if you have 3 partitions in the topic, and you use a partition key of 3(key of 3 is invalid because valid partition numbers are 0,1,2), this exception is thrown. This error is consistent when the partition number in the producer's send() method does not match with the range of partitions in the topic. I used kafka version 0.8.2. The client API I used was the package kafka.javaapi.producer.Producer.

查看更多
登录 后发表回答