I have have 3 spring kafka consumers (same group) getting messages from 3 partitions. I want to detect when one of these consumers stops reading from 1 partition (other 2 consumers continue reading from the other 2 partitions). This has happened twice so far and when detected, it is easy to fix by restarting all consumers which causes a re-balance. The problem is on both occasions it would have been good to know earlier. So I tried using ListenerContainerIdleEvent like so -
@EventListener
public void eventHandler(ListenerContainerIdleEvent event) {
LOG.info("idle event fired! listnerId=" + event.getListenerId());
Collection<org.apache.kafka.common.TopicPartition> partitions = event.getTopicPartitions();
partitions.forEach(p ->
LOG.info("partition: " + p.partition() + " topic:" + p.topic()));
}
Here are my test results -
1) 1 consumer reading from 1 partition, this event works well.
2) 1 consumer reading from 3 partitions, this event gets called only when are no messages on all 3 partitions. If there are no messages on 1 or 2 partitions but there are messages on the 3rd partition, this event does not get called.
Is there a way I can get notified when messages are not being read (for whatever reason...consumer issue or no messages available to be read from the partition) from 1 partition when a consumer is assigned to multiple partitions?
Update: 03/27/2018
I am not sure if I should ask a new question related to this so trying to extend this question first. I have 1 consumer consuming from 1 topic with 3 partitions. I have set the idleEventInterval=30secs. Every 30 secs, I get the following log messages.
12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 30855 milliseconds 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 30845 milliseconds 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 1 topic:test-topic 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 2 topic:test-topic 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 0 topic:test-topic 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 30855 milliseconds 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 30845 milliseconds 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 1 topic:test-topic 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 2 topic:test-topic 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 0 topic:test-topic
12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 60977 milliseconds 12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 1 topic:test-topic 12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 0 topic:test-topic 12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 60977 milliseconds 12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 1 topic:test-topic 12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 0 topic:test-topic 12:13:21.632 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 60975 milliseconds 12:13:21.632 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 2 topic:test-topic 12:13:21.633 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 60975 milliseconds 12:13:21.633 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 2 topic:test-topic
The event listener code is -
@EventListener
public void eventHandler(ListenerContainerIdleEvent event) {
LOG.info("No messages received for " + event.getIdleTime() + " milliseconds");
Collection<org.apache.kafka.common.TopicPartition> partitions = event.getTopicPartitions();
partitions.forEach(p ->
LOG.info("partition: " + p.partition() + " topic:" + p.topic()));
}
1) Why does this event get called 4 times every 30 sec?
2) Why is the partition information not consistent for every set of messages? Sometimes there is no partition information, sometimes partitions are repeated in the same set, etc.