I'm trying to find out which offsets my current High-Level consumers are working off. I use Kafka 0.8.2.1, with no "offset.storage" set in the server.properties of Kafka - which, I think, means that offsets are stored in Kafka. (I also verified that no offsets are stored in Zookeeper by checking this path in the Zk shell: /consumers/consumer_group_name/offsets/topic_name/partition_number
)
I tried to listen to the __consumer_offsets
topic to see which consumer saves what value of offsets, but it did not work...
I tried the following:
created a config file for console consumer as following:
=> more kafka_offset_consumer.config
exclude.internal.topics=false
and tried two versions of the console consumer scripts:
#1:
bin/kafka-console-consumer.sh --consumer.config kafka_offset_consumer.config --topic __consumer_offsets --zookeeper localhost:2181
#2
./bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 0 --broker-list localhost:9092 --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" --consumer.config kafka_offset_consumer.config
Neither worked - it just sits there but does not print anything, even though the consumers are actively consuming/saving offsets.
Am I missing some other configuration/properties ?
thanks!
Marina
I came across this question when trying to also consume from the __consumer_offsets topic.
I managed to figure it out for different Kafka versions and thought I'd share what I'd found
For Kafka 0.8.2.x
#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" \
--zookeeper localhost:2181 --topic __consumer_offsets --from-beginning
For Kafka 0.9.x.x and 0.10.0.0
#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" \
--zookeeper localhost:2181 --topic __consumer_offsets --from-beginning
For 0.11.0.0
#Create consumer config
echo "exclude.internal.topics=false" > /tmp/consumer.config
#Consume all offsets
./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--zookeeper localhost:2181 --topic __consumer_offsets --from-beginning
Ok, I have figured out what was the problem. My Kafka was actually using Zookeeper as the offset storage, not Kafka .... The reason I did not detect that right away was because I was checking ZK content incorrectly:
I was doing
ls /consumers/consumer_group_name/offsets/topic_name/partition_number
and seeing nothing there. Instead I had to 'get' content - which did show correct offsets for my consumers, like the one below:
get /consumers/consumer_group_name/offsets/topic_name/partition_number
185530404
cZxid = 0x70789ad05
ctime = Mon Nov 23 17:49:46 GMT 2015
mZxid = 0x7216cdc5c
mtime = Thu Dec 03 20:18:57 GMT 2015
pZxid = 0x70789ad05
cversion = 0
dataVersion = 3537384
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0
If you add --from-beginning
is should most likely gives you some results, at least it did when I tried myself. And or if you don't provide that argument but read more messages (and trigger offset commits) while you have that consumer listening, that should also display messages there.
As of Kafka 0.11, the (Scala) source code can be found here
For those who need a Java translation, from any Consumer process, let's say you get a ConsumerRecord<byte[], byte[]> consumerRecord
, and you can use
Get the key, (check if the key is not null first) and use GroupMetadataManager.readMessageKey(consumerRecord.key)
. That can return different types, so check if ( ... instanceof OffsetKey)
, then cast it and you can get various values from that.
To get the Kafka record value of the offsets, you can use String.valueOf(GroupMetadataManager.readOffsetMessageValue(consumerRecord.value))
A minimal Java example translated from the Scala code...
byte[] key = consumerRecord.key;
if (key != null) {
Object o = GroupMetadataManager.readMessageKey(key);
if (o != null && o instanceOf OffsetKey) {
OffsetKey offsetKey = (OffsetKey) o;
Object groupTopicPartition = offsetKey.key;
byte[] value = consumerRecord.value;
String formattedValue = String.valueOf(GroupMetadataManager.readOffsetMessageValue(value);
// TODO: Print, store, or compute results with the new key and value
}
}
Note, it's also possible to use the AdminClient APIs to describe groups rather than consume these raw messages
- listConsumerGroupOffsets(): to find all offsets for a specific group
- describeConsumerGroups(): to find details about members of a group
Scala source code extract
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
// Only print if the message is an offset record.
// We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
case offsetKey: OffsetKey =>
val groupTopicPartition = offsetKey.key
val value = consumerRecord.value
val formattedValue =
if (value == null) "NULL"
else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))
output.write("::".getBytes(StandardCharsets.UTF_8))
output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
output.write("\n".getBytes(StandardCharsets.UTF_8))
case _ => // no-op
}