How to detect when a spring kafka consumer stops g

2019-08-22 02:45发布

I have have 3 spring kafka consumers (same group) getting messages from 3 partitions. I want to detect when one of these consumers stops reading from 1 partition (other 2 consumers continue reading from the other 2 partitions). This has happened twice so far and when detected, it is easy to fix by restarting all consumers which causes a re-balance. The problem is on both occasions it would have been good to know earlier. So I tried using ListenerContainerIdleEvent like so -

@EventListener
public void eventHandler(ListenerContainerIdleEvent event) {
    LOG.info("idle event fired! listnerId=" + event.getListenerId());

    Collection<org.apache.kafka.common.TopicPartition> partitions = event.getTopicPartitions();
    partitions.forEach(p ->
            LOG.info("partition: " + p.partition() + " topic:" + p.topic()));
}

Here are my test results -

1) 1 consumer reading from 1 partition, this event works well.

2) 1 consumer reading from 3 partitions, this event gets called only when are no messages on all 3 partitions. If there are no messages on 1 or 2 partitions but there are messages on the 3rd partition, this event does not get called.

Is there a way I can get notified when messages are not being read (for whatever reason...consumer issue or no messages available to be read from the partition) from 1 partition when a consumer is assigned to multiple partitions?

Update: 03/27/2018

I am not sure if I should ask a new question related to this so trying to extend this question first. I have 1 consumer consuming from 1 topic with 3 partitions. I have set the idleEventInterval=30secs. Every 30 secs, I get the following log messages.

12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 30855 milliseconds 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 30845 milliseconds 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 1 topic:test-topic 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 2 topic:test-topic 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 0 topic:test-topic 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 30855 milliseconds 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 30845 milliseconds 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 1 topic:test-topic 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 2 topic:test-topic 12:12:51.517 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 0 topic:test-topic

12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 60977 milliseconds 12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 1 topic:test-topic 12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 0 topic:test-topic 12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 60977 milliseconds 12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 1 topic:test-topic 12:13:21.630 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 0 topic:test-topic 12:13:21.632 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 60975 milliseconds 12:13:21.632 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 2 topic:test-topic 12:13:21.633 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - No messages received for 60975 milliseconds 12:13:21.633 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.t.m.e.s.e.p.k.InboundMessageHandler - partition: 2 topic:test-topic

The event listener code is -

  @EventListener
public void eventHandler(ListenerContainerIdleEvent event) {
    LOG.info("No messages received for " + event.getIdleTime() + " milliseconds");

Collection<org.apache.kafka.common.TopicPartition> partitions = event.getTopicPartitions();
partitions.forEach(p ->
    LOG.info("partition: " + p.partition() + " topic:" + p.topic()));

}

1) Why does this event get called 4 times every 30 sec?

2) Why is the partition information not consistent for every set of messages? Sometimes there is no partition information, sometimes partitions are repeated in the same set, etc.

1条回答
Root(大扎)
2楼-- · 2019-08-22 03:27

... no messages available to be read from the partition

If the concurrency is 1 and there are 3 partitions, all three partitions will be handled by the same consumer. There is nothing currently in the framework that will publish events if a consumer is assigned more that one topic and has not received a message from a particular topic for some period of time.

If you increase the container concurrency to 3, you will have 3 consumers - one per partition. The event will be published by each consumer if it goes idle. The listener must be thread-safe since there will be 3 threads calling it and often concurrently.

Obviously, this won't scale well if you have a large number of partitions.

for whatever reason...consumer issue...

It would be rather strange if there are messages in a topic and they are not received by the consumer to which that topic is assigned, yet that consumer is actively receiving messages from other partitions. That would require help from the kafka folks.

查看更多
登录 后发表回答