For particular reasons I need to use both - ConsumerGroup
(a.k.a. high-level consumer) and SimpleConsumer
(a.k.a. low-level consumer) to read from Kafka. For ConsumerGroup
I use ZooKeeper-based config and am completely satisfied with it, but SimpleConsumer
requires seed brokers to be instantiated.
I don't want to keep list of both - ZooKeeper and broker hosts. Thus, I'm looking for a way to automatically discover brokers for a particular topic from ZooKeeper.
Because of some indirect information I belief that these data is stored in ZooKeeper under one of the following paths:
/brokers/topics/<topic>/partitions/<partition-id>/state
- /brokers/ids/
However, when I try to read data from these nodes, I'm getting serialization error (I'm using com.101tec.zkclient
for this):
org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.StreamCorruptedException: invalid stream header: 7B226A6D at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37) at org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744) ... 64 elided Caused by: java.io.StreamCorruptedException: invalid stream header: 7B226A6D at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) at java.io.ObjectInputStream.(ObjectInputStream.java:299) at org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.(TcclAwareObjectIputStream.java:30) at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31) ... 69 more
I can write and read custom Java objects (e.g. Strings) without any problem, so I believe it's not a problem of a client, but rather tricky encoding. Thus, I want to know:
- If this is the right way to go, how to read these nodes properly?
- If the whole approach is wrong, what is the right one?
That is the way of what one of my colleagues did to get a list of Kafka brokers. I think it's a correct way when you want to get a broker list dynamically.
Here is an example code that shows how to get the list.
Running the code onto the cluster consisting of three brokers results in
It turns out that Kafka uses
ZKStringSerializer
to read and write data into znodes. So, to fix the error I only had to add it as a last parameter inZkClient
constructor:Using it, I wrote several useful functions for discovering broker ids, their addresses and other stuff:
actually, there is
ZkUtils
from within Kafka (at least for 0.8.x line), that you can use with one small caveat: you'll need to re-implement ZkStringSerializer that would convert strings as UTF-8 encoded byte arrays. If you'd like to use Java8's streaming APIs, you can iterate over Scala collections througscala.collection.JavaConversions
. This is the thing that helped my case.To do this using the shell: