卡夫卡流应用程序始终是经济转型和得到错误:协调不知道该成员的(Kafka Streams app i

2019-10-29 04:49发布

我有一个卡夫卡流应用程序消耗1个源话题20个分区。 业务负载是约2K记录/秒。 我部署的应用实例63和它的正常工作。 但是我注意到,分区分配总是在变化。 我查了KafkaStreams#localTheadMetadata输出为每个实例的反应总是PARTITIONS_REVOKEDPARTITIONS_ASSIGNED ,有时它RUNNING

从日志中,我看到了两个不同的错误:

  1. Offset commit failed on partition production-smoke-KSTREAM-REDUCE-STATE-STORE-0000000015-repartition-13 at offset 25010: The coordinator is not aware of this member.

  2.  org.apache.kafka.streams.errors.StreamsException: task [2_13] Abort sending since an error caught with a previous record (key 264344038933 value [B@359a4692 timestamp 1563412782970) to topic production-smoke-KTABLE-SUPPRESS-STATE-STORE-0000000021-changelog due to org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. You can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.``` 

该应用程序仍在运行和发送消息到下游的话题。 我的理解是,因为我的应用程序并不需要63个节点,一些节点是空闲的,一旦一个节点是死的,由于上面的错误,重新平衡将被触发。 是正确的吗? (某些节点返回KafkaStreams is not running. State is ERROR.当调用KafkaStreams#localTheadMetadata )的应用程序将完全在所有节点都死了死了吗?

任何人都可以帮助理解什么是解决上述错误的正确方法? 会增加retriesretry.backoff.ms掩盖一些更大的问题?

这里是我的配置有:生产者配置:

buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
commit.interval.ms = 30000
connections.max.idle.ms = 540000
max.task.idle.ms = 0
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 60000
retries = 20
retry.backoff.ms = 60000
rocksdb.config.setter = null
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000

消费者配置:

auto.commit.interval.ms = 5000
auto.offset.reset = none
check.crcs = true
client.dns.lookup = default
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = 
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
isolation.level = read_uncommitted
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 60000
retry.backoff.ms = 60000

非常感谢!

文章来源: Kafka Streams app is always REBALANCING and getting error: The coordinator is not aware of this member