I am running this kafka producer example mentioned in its site
The code:
public class TestProducer {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
Random rnd = new Random();
Properties props = new Properties();
props.put("metadata.broker.list", "host.broker-1:9093, host.broker-2:9093, host.broker-3:9095");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "test.app.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = "192.168.2." + rnd.nextInt(255);
String msg = runtime + ",www.example.com," + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
}
producer.close();
}
}
public class SimplePartitioner implements Partitioner{
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
}
return partition;
}
}
More details:
I am running this application on a host(call is producer) which is remote to host-broker[1-3]
I can ping and ssh the broker host from producer host.
Provided the advertised.host.name in the server.properties (they are named as server[1-3].properties in the brokers respectively
The properties:
broker.id=1
port=9093
host.name=host.broker.internal.name
advertised.host.name=host-broker1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/1/kafka-logs-1,/data/2/kafka-logs-2
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
zookeeper.connection.timeout.ms=6000
Any idea on how to fix this error?
I got these errors when running a Kafka producer:
Found a solution:
On my Mac box, after I download the
scala-2.10
andkafka_2.10-0.8.1
, in the kafka_2.10-0.8.1 directory, every thing is fine when I start zookeeper, kafka server, and create a test topic. Then I need to start a producer for the test topic. but there is an error:The reason is that in the kafka libs directory, the kafka release zip file only included jar file of slf4j-api, they missed a jar file: slf4j-nop.jar, so we have to go to http://www.slf4j.org, download
slf4j-1.7.7.zip
, and then unzip it, copy the slf4j-api-1.7.7, slf4j-nop-1.7.7.jar into kafka’s libs directory.Restart kafka producer again, now no error is reported.
Source: SOLUTION
This can happen if the client cannot reach BOTH hostname and IP of the kafka broker.
Make an entry to the clients \etc\hosts or C:\Windows\System32\drivers\etc\hosts and that resolved this issue for me.
You need to add the SLF4j logging implementation. if you are using maven as the build tool try adding this following to your pom.xml and see if it works ..
I got the error from apache kafka:
My Setup:
Was able to fix it with these commands:
Then re-run the command and the producer doesn't throw any error.
ForHDP kafka use broker port: 6667
For Standalone kafka use broker port: 9092
The error was because of the port no which we were using (HDP uses 6667 but we were using 9092)
bin/kafka-console-producer.sh --broker-list broker-ip:9092 --topic test //not working
bin/kafka-console-producer.sh --broker-list broker-ip:6667 --topic test //working
link: Kafka console producer Error in Hortonworks HDP 2.3 Sandbox
This is a solution to the exception in the original question asked by Krish: "kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries."
The FAQ here and here says that your hostname should be set correctly. I have not experienced that condition. But I found another condition when a kafka producer gives this error message: when your partition key in the producer is wrong. That is, if you have a topic with one partition, then partition key in the producer can be either null(message is sent to a random partition) or 0(partitions in kafka are numbered starting from 0). If you try to use a partition key of 1, this exception is thrown in the producer. Or if you have 3 partitions in the topic, and you use a partition key of 3(key of 3 is invalid because valid partition numbers are 0,1,2), this exception is thrown. This error is consistent when the partition number in the producer's send() method does not match with the range of partitions in the topic. I used kafka version 0.8.2. The client API I used was the package kafka.javaapi.producer.Producer.