Kafka Streams app is always REBALANCING and gettin

2019-08-20 04:19发布

问题:

I have a Kafka streams app to consume 1 source topic with 20 partitions. Traffic load is about 2K records/sec. I deployed the app to 63 instances and it's working fine. But I noticed that, the partition assignment is always changing. I checked the KafkaStreams#localTheadMetadata output for each instance, the response is always PARTITIONS_REVOKED or PARTITIONS_ASSIGNED, sometime it's RUNNING.

From the log, I saw two different errors:

  1. Offset commit failed on partition production-smoke-KSTREAM-REDUCE-STATE-STORE-0000000015-repartition-13 at offset 25010: The coordinator is not aware of this member.

  2. org.apache.kafka.streams.errors.StreamsException: task [2_13] Abort sending since an error caught with a previous record (key 264344038933 value [B@359a4692 timestamp 1563412782970) to topic production-smoke-KTABLE-SUPPRESS-STATE-STORE-0000000021-changelog due to org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
    You can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.```
    

The app is still running and sending messages to downstream topic. My understanding is, since my app doesn't need 63 nodes, some nodes are idle and once one node is dead due to above error, rebalance will be triggered. Is the correct? (Some nodes return KafkaStreams is not running. State is ERROR. when call KafkaStreams#localTheadMetadata) The app will be dead entirely once all nodes are dead?

Can anyone help to understand what's the right way to solve the above errors? Will increase retries and retry.backoff.ms mask some bigger issues?

Here is the config I have: Producer config:

buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
commit.interval.ms = 30000
connections.max.idle.ms = 540000
max.task.idle.ms = 0
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 60000
retries = 20
retry.backoff.ms = 60000
rocksdb.config.setter = null
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000

Consumer config:

auto.commit.interval.ms = 5000
auto.offset.reset = none
check.crcs = true
client.dns.lookup = default
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = 
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
isolation.level = read_uncommitted
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 60000
retry.backoff.ms = 60000

Thanks a lot!