Is there any way to check if kafka is up and runni

2019-06-24 07:08发布

问题:

I am using kafka-net client to send messages to kafka. I'm just wondering if there is any way to check is kafka server up and can receive messages. I shut kafka down, but the producer has been created successfully and SendMessageAsync just freezes for quite a long time. I've tried to pass timeout but it doesn't change anything. I use kafka-net 0.9 It works just fine when kafka server is up and running

回答1:

Broker's id is registered in zookeeper(/brokers/ids/[brokerId]) as ephemeral node, which allow other brokers and consumers to detect failures.(Right now the definition of health is fairly naive., if registered in zk /brokers/ids/[brokerId] the broker is healthy, otherwise it is dead).

zookeeper ephemeral node exists as long as the broker's session is active.

You could check if broker is up via ZkUtils.getSortedBrokerList(zkClient), which return all active broker id under /brokers/ids

import org.I0Itec.zkclient.ZkClient;

ZkClient zkClient = new ZkClient(properties.getProperty("zkQuorum"), zkSessionTimeout, zkConnectionTimeout,ZKStringSerializer$.MODULE$);
ZkUtils.getSortedBrokerList(zkClient);

Reference
Kafka data structures in Zookeeper



回答2:

Try this.

In your constructor, put

options = new KafkaOptions(uri);
var endpoint = new DefaultKafkaConnectionFactory().Resolve(options.KafkaServerUri.First(), options.Log);
client = new KafkaTcpSocket(new DefaultTraceLog(), endpoint);

and then before you send each message,

// test if the broker is alive
var request = new MetadataRequest { Topics = new List<string>() { Topic } };
var task1 = client.WriteAsync(request.Encode()).ConfigureAwait(false);
Task<KafkaDataPayload> task2 = Task.Factory.StartNew(() =>  task1.GetAwaiter().GetResult());
if (task2.Wait(30000) == false)
{
    throw new TimeoutException("Timeout while sending message to kafka broker!");
}

If you have a high volume of messages, this is going to be a performance hit, but with a low volume of messages it shouldn't matter.