Assert Kafka send worked

2019-03-06 08:02发布

问题:

I'm writing an application with Spring Boot so to write to Kafka I do:

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

and then inside my method:

kafkaTemplate.send(topic, data)

But I feel like I'm just relying on this to work, how can I know if this has worked? If it's asynchronous, is it a good practice to return a 200 code and hoped it did work? I'm confused. If Kafka isn't available, won't this fail? Shouldn't I be prompted to catch an exception?

回答1:

Yes, if Kafka is not available, that .send() call will fail, but if you send it async, no one will be notified. You can specify a callback that you want to be executed when the future finally finishes. Full interface spec here: https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/producer/Callback.html

From the official Kafka javadoc here: https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

Fully non-blocking usage can make use of the Callback parameter to provide a callback that will be invoked when the request is complete.

  ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value); 
      producer.send(myRecord,
           new Callback() {
               public void onCompletion(RecordMetadata metadata, Exception e) {
                   if(e != null) {
                      e.printStackTrace();
                   } else {
                      System.out.println("The offset of the record we just sent is: " + metadata.offset());
                   }
               }
           });


回答2:

Along with what @mjuarez has mentioned you can try playing with two Kafka producer properties. One is ProducerConfig.ACKS_CONFIG, which lets you set the level of acknowledgement that you think is safe for your use case. This knob has three possible values. From Kafka doc

  • acks=0: Producer doesn't care about acknowledgement from server, and considers it as sent.
  • acks=1: This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers.
  • acks=all: This means the leader will wait for the full set of in-sync replicas to acknowledge the record.

The other property is ProducerConfig.RETRIES_CONFIG. Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error.



回答3:

you can use below command while sending messages to kafka:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-name

while above command is running you should run your code and if sending messages being successful then the message must be printed on the console.

Furthermore, likewise any other connection to any resources if the connection could not be established, then doing any kinds of operations would result some exception raises.