I am trying to monitor consumer offsets of a given group with the Java API. I create one additional consumer which does not subscribe to any topic, but just calls consumer.committed(topic)
to get the offset information. This kind of works, but:
For testing I use only one real consumer (i.e. one which does subscribe to the topic). When I shut it down using close()
and later restart one, it takes 27 seconds between subscribe and the first consumption of messages, despite the fact that I use poll(1000)
.
I am guessing this has to do with the rebalancing possibly being confused by the non-subscribing consumer. Could that be possible? Is there a better way to monitor offsets with the Java API (I know about the command line tool, but need to use the API).
There are different ways to inspect offset from topics, depends on the purpose of what you want it for, besides of "committed" that you described above, here are two more options:
1) if you want to know the offset id from which the consumer start to fetch data from broker next time Thread(s) start(s), then you must use "position" as
long offsetPosition;
TopicPartition tPartition = new TopicPartition(topic,partitionToReview);
offsetPosition = kafkaConsumer.position(tPartition);
System.out.println("offset of the next record to fetch is : " + position);
2) calling "offset()" method from ConsumerRecord object, after performing a poll from kafkaConsumer
Iterator<ConsumerRecord<byte[],byte[]>> it = kafkaConsumer.poll(1000).iterator();
while(it.hasNext()){
ConsumerRecord<byte[],byte[]> record = it.next();
System.out.println("offset : " + record.offset());
}
Found it: the monitoring consumer added to the confusion but was not the culprit. In the end it is easy to understand though slightly unexpected (for me at least):
The default for session.timeout.ms
is 30 seconds. When a consumer disappears it takes up to 30 seconds before it is declared dead and the work is rebalanced. For testing, I had stopped the single consumer I had, waited three seconds and restarted a new one. This then took 27 seconds before it started, filling the 30 seconds time-out.
I would have expected that a single, lone consumer starting up does not wait for the time-out to expire, but starts to "rebalance", i.e. grab the work immediately. It seems though that the time-out has to expire before work is rebalanced, even if there is only one consumer.
For the testing to get through faster, I changed the configuration to use a lower session.timeout.ms
for the consumer as well as group.min.session.timeout.ms
for the broker.
To conclude: using a consumer that does not subscribe to any topic for monitoring the offsets works just fine and does not seem to interfere with the rebalancing process.