How to find the number of commits and current offset in each partition of a known kafka topic. I am using kafka v0.8.1.1
It is not clear from your question, what kind of offset you're interested in. There are actually three types of offsets:
- The offset of the first available message in topic's partition. Use -2 (earliest) as --time parameter for GetOffsetShell tool
- The offset of the last available message in topic's partition. Use -1(latest) as --time parameter.
- The last read/processed message offset maintained by kafka consumer. High level consumer stores this information in Zookeeper (separately for every consumer group) and takes care about keeping it up to date when you call commit() or when auto-commit setting is set to true. For simple consumer, your code have to take care about managing offsets.
In addition to command line utility, the offset information for #1 and #2 is also available via SimpleConsumer.earliestOrLatestOffset().
If the number of messages is not too large, you can specify a large --offsets parameter to GetOffsetShell and then count number of lines returned by the tool. Otherwise, you can write a simple loop in scala/java that would iterate all available offsets starting from the earliest.
From Kafka documentation:
Get Offset Shell
get offsets for a topic
required argument [broker-list], [topic]
Option Description
------ -----------
--broker-list <hostname:port,..., REQUIRED: The list of hostname and hostname:port> port of the server to connect to.
--max-wait-ms <Integer: ms> The max amount of time each fetch request waits. (default: 1000)
--offsets <Integer: count> number of offsets returned (default: 1)
--partitions <partition ids> comma separated list of partition ids. If not specified, will find offsets for all partitions (default)
--time <Long: timestamp in milliseconds / -1(latest) / -2 (earliest) timestamp; offsets will come before this timestamp, as in getOffsetsBefore >
--topic <topic> REQUIRED: The topic to get offsets from.
Regarding the offset of the topic and partition you can use For example using these command (I have topic games
bin/ --broker-list localhost:9092 --topic games --time -1
I will get games:0:47841
which means that for topic games
and 0
partition I have latest not used offset 47841
(latest available message).
You can use -2
to see the first available message.
Starting version 0.9.0.x you should start to use the kafka.admin.ConsumerGroupCommand tool. Below are the arguments that the tool take
List all consumer groups, describe a consumer group, or delete consumer group info.
Option Description
------ -----------
--bootstrap-server <server to connect REQUIRED (only when using new-
to> consumer): The server to connect to.
--command-config <command config Property file containing configs to be
property file> passed to Admin Client and Consumer.
--delete Pass in groups to delete topic
partition offsets and ownership
information over the entire consumer
group. For instance --group g1 --
group g2
Pass in groups with a single topic to
just delete the given topic's
partition offsets and ownership
information for the given consumer
groups. For instance --group g1 --
group g2 --topic t1
Pass in just a topic to delete the
given topic's partition offsets and
ownership information for every
consumer group. For instance --topic
WARNING: Group deletion only works for
old ZK-based consumer groups, and
one has to use it carefully to only
delete groups that are not active.
--describe Describe consumer group and list
offset lag related to given group.
--group <consumer group> The consumer group we wish to act on.
--list List all consumer groups.
--new-consumer Use new consumer.
--topic <topic> The topic whose consumer group
information should be deleted.
--zookeeper <urls> REQUIRED (unless new-consumer is
used): The connection string for the
zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.
To get offsets for a Topic_X for a consumerGroup_Y use the command as below
bin/ kafka.admin.ConsumerGroupCommand --zookeeper <zookeeper urls> --describe --group consumerGroup_Y
Response would look like
consumerGroup, Topic_X, 0, 3030460, 3168412, 137952, none
consumerGroup, Topic_X, 1, 3030903, 3168884, 137981, none
consumerGroup, Topic_X, 2, 801564, 939540, 137976, none
consumerGroup, Topic_X, 3, 737290, 875262, 137972, none
consumerGroup, Topic_X, 4, 737288, 875254, 137966, none
consumerGroup, Topic_X, 5, 737276, 875241, 137965, none
consumerGroup, Topic_X, 6, 737290, 875251, 137961, none
consumerGroup, Topic_X, 7, 737290, 875248, 137958, none
consumerGroup, Topic_X, 8, 737288, 875246, 137958, none
consumerGroup, Topic_X, 9, 737293, 875251, 137958, none
consumerGroup, Topic_X, 10, 737289, 875244, 137955, none
consumerGroup, Topic_X, 11, 737273, 875226, 137953, none
This information was also helpful in creating a script to view the number of messages on a partition for a topic (from the command line). While tools like Kafka-Web-Console are nice, some of us live in a non-GUI world.
Here is the script ... use and modify it at your own risk :-)
if [[ -z "${topic}" ]] ; then
echo "Usage: ${0} <topic>"
exit 1
if [[ -z "${KAFKA_HOME}" ]] ; then
# $KAFKA_HOME not set, using default /kafka
if [ ! -d ${KAFKA_HOME} ] ; then
echo "\$KAFKA_HOME does not point to a valid directory [$KAFKA_HOME]"
exit 1
echo "Topic: ${topic}: "
printf "Partition Count\n"
printf "~~~~~~~~~~ ~~~~~~~~~~~~\n"
for msg in `bin/ --topic ${topic} --broker-list localhost:9092 --time -1` ; do
name=`echo ${msg} | awk -F ":" '{print $1}'`
partition=`echo ${msg} | awk -F ":" '{print $2}'`
total=`echo ${msg} | awk -F ":" '{print $3}'`
printf "%10d %12d\n" ${partition} ${total}
idx=$((idx + 1))
if [ ${idx} -eq 0 ] ; then
echo "Topic name not found!"
exit 1
exit ${rc}