Kafka consumer fetching metadata for topics failed

2020-02-20 07:01发布

问题:

I am attempting to write a Java client for a third party's Kafka and ZooKeeper servers. I am able to list and describe topics, but when I attempt to read any, a ClosedChannelException is raised. I reproduce them here with the command line client.

$ bin/kafka-console-consumer.sh --zookeeper 255.255.255.255:2181 --topic eventbustopic
[2015-06-02 16:23:04,375] WARN Fetching topic metadata with correlation id 0 for topics [Set(eventbustopic)] from broker [id:1,host:SOME_HOST,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException                                       
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)           
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)        
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)                
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)        
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)        
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)         
[2015-06-02 16:23:04,515] WARN Fetching topic metadata with correlation id 0 for topics [Set(eventbustopic)] from broker [id:0,host:SOME_HOST,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException                                       
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)           
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)        
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)                
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)        
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)        
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)         

Alternate commands succeed:

$ bin/kafka-topics.sh --describe --zookeeper 255.255.255.255:2181 --topic eventbustopic
Topic:eventbustopic   PartitionCount:2        ReplicationFactor:1     Configs:
    Topic: eventbustopic  Partition: 0    Leader: 1       Replicas: 1     Isr: 1
    Topic: eventbustopic  Partition: 1    Leader: 0       Replicas: 0     Isr: 0

$ bin/kafka-topics.sh --list --zookeeper 255.255.255.255:2181 --topic eventbustopic
eventbustopic

(The ips were redacted and replaced with 255.255.255.255)

When I google this exception, I see issues on the producer side -- indeed, the source for ClientUtils.fetchTopicMetadata suggests this is mainly used by producers.

One concern that I have is that this might be a product of the network layout: the packets are mangled by Haproxy and sent over a VPN.

What exactly is at work here?

回答1:

The broker tells the client which hostname should be used to produce/consume messages. By default Kafka uses the hostname of the system it runs on. If this hostname can not be resolved by the client side you get this exception.

You can try setting advertised.host.name in the Kafka configuration to an hostname/address which the clients should use.



回答2:

Here is my way to solve this problem:

  1. run bin/kafka-server-stop.sh to stop running kafka server.
  2. modify the properties file config/server.properties by adding a line: listeners=PLAINTEXT://{ip.of.your.kafka.server}:9092
  3. restart kafka server.

Since without the lisener setting, kafka will use java.net.InetAddress.getCanonicalHostName() to get the address which the socket server listens on.



回答3:

You have a problem with Zookeeper. 255.255.255.255:2181 is not a valid Zookeeper address; this is a broadcast address on your network or a subnet mask. To make the things work, find the IP Address or hostname of the machine running Zookeeper.



回答4:

Ran into this error on AWS. Problem was I was overly restrictive with the security group and set ports 2181 and 9092 to "my IP". This meant the kafka instance couldn't find the ZK running on the same box.

Solution - open it up - a little.