Kafka Streams error - Offset commit failed on part

2019-05-29 09:27发布

问题:

We use Kafka Streams for consuming, processing and producing messages, and on PROD env we faced with errors on multiple topics:

ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=app-xxx-StreamThread-3-consumer, groupId=app] 
Offset commit failed on partition xxx-1 at offset 13920: 
The request timed out.[]

These errors occur rarely for topics with small load, but for topics with high load (and spikes) errors occur dozens of times a day per topic. Topics have multiple partitions (e.g. 10). Seems this issue does not affect processing of data (despite performance), as after throwing exception (even could be multiple errors for the same offset), consumer later re-read message and successfully process it.

I see that this error message appeared in kafka-clients version 1.0.0 due to PR, but in previous kafka-clients versions for the same use case (Errors.REQUEST_TIMED_OUT on consumer) similar message (Offset commit for group {} failed: {}) was logged with debug level. as for me, it would be more logical to update log level to warning for such use case.

How to fix this issue? What could be the root cause? Maybe changing consumer properties or partition setup could help to get rid of such issue.

we use the following implementation for creating Kafka Streams:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.<String, String>stream(topicName);
stream.foreach((key, value) -> processMessage(key, value));
Topology topology = builder.build();
StreamsConfig streamsConfig = new StreamsConfig(consumerSettings);
new KafkaStreams(streamsTopology, streamsConfig);

our Kafka consumer settings:

bootstrap.servers: xxx1:9092,xxx2:9092,...,xxx5:9092
application.id: app
state.dir: /tmp/kafka-streams/xxx
commit.interval.ms: 5000       # also I tried default value 30000
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
timestamp.extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor

kafka broker version: kafka_2.11-0.11.0.2. error occur on both versions of Kafka Streams: 1.0.1 and 1.1.0.

回答1:

Looks like you have issue with Kafka cluster and Kafka consumer is get timed out while trying to commit offsets. You can try to increase connection related configs for Kafka consumer

  1. request.timeout.ms (by default 305000ms)

The configuration controls the maximum amount of time the client will wait for the response of a request

  1. connections.max.idle.ms (by default 540000ms)

Close idle connections after the number of milliseconds specified by this config.