Consumer Stuck in Re-join

2019-04-12 16:58发布

问题:

I've read other threads and I've gotten around the problem by using a new group ID, however I'd like to understand what could cause this.

I have a topic with 16 partitions, I've set session.timeout.ms=30000, and max.poll.interval.ms=30000000.

I run my program, and ctrl+c it, so it's not closing properly. After I guess, 16 times, I get stuck in this re-join issue. session.timeout.ms is the heartbeat timeout, so after 30 seconds it should kick my consumer right and my partitions should "free up" right? Or is it only listening to my max.poll.interval.ms?

EDIT: I still get this error intermittently, and when it happens i have to restart all my consumers. This happens even when my consumers were running fine and then they start all getting stuck at rejoining (no consumers were added/removed). Here's an error log from when I try to connect to it after with a new consumer when it's stuck in that state :

https://pastebin.com/AXJeSHkp

2017-06-29 17:28:16,215 DEBUG [AbstractCoordinator] - [scheduler-1] - Sending JoinGroup ((type: JoinGroupRequest, groupId=ingestion-matching-kafka-consumer-group-dev1, sessionTimeout=30000, rebalanceTimeout=43200000, memberId=, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@b45e5583)) to coordinator kafka04-prod01.messagehub.services.us-south.bluemix.net:9093 (id: 2147483644 rack: null)

2017-06-29 17:37:21,261 DEBUG [NetworkClient] - [scheduler-1] - Node 2147483644 disconnected.
2017-06-29 17:37:21,263 DEBUG [ConsumerNetworkClient] - [scheduler-1] - Cancelled JOIN_GROUP request {api_key=11,api_version=1,correlation_id=19,client_id=ingestion-matching-kafka-consumer-dev1} with correlation id 19 due to node 2147483644 being disconnected

Those are the first and last messages I think are relevant. Here are the relevant timeouts I've set:

session.timeout.ms=30000
max.poll.interval.ms=43200000    
request.timeout.ms=43205000 # the docs said to keep this higher than max.poll.interval.ms
enable.auto.commit=false

Should I set heartbeat.interval.ms too? Is this the interval that heartbeats are sent by the consumer to the broker automatically in some background thread (I have read the docs but for some reason I can't quite wrap my head around it)?

回答1:

If your client does not disconnect properly (crash or SIGINT), it will take session.timeout.ms (30 seconds in your case) for the server to kick it from the group. During this time, the server will still think the consumer is part of the group, so it will not do any reassignments. Once this delay is over, assigned partitions will be reassigned to other consumers (if any).

This of course does not happen if you use a new group ID. While it's tempting to use a new group everytime when developing (as you don't have to wait) you lose any committed offsets by the previous group and this might not represent the state your app will be in while running in production.

Regarding max.poll.interval.ms, it's the maximum delay allowed between 2 calls to poll() in your consumer logic. I don't think this setting is relevant to this question.



回答2:

I know it's a quite old question but I had similar issue and finally I understood the reason of this situation and want to share.

When rebalance starts Kafka waits all consumers in the group to poll() and send joinGroup request. Rebalance timeout is equal to max.poll.interval.ms. So Kafka waits until rebalance timeout or end of the process for each consumer.

In your case you set max.poll.interval.ms to 12 hours. Only sensible reason to that you must have a long process. So when rebalance starts Kafka will wait until your process is finished or 12 hours is passed. That's why your consumer seems stuck.