Kafka Streams stops listening from topic and proce

2019-08-28 04:45发布

问题:

I have a 2 Kafka Streams application. One application listens from say topic1 and produces to topic2 and the other listens from topic2 and produces to topic3. The applications were working fine before the kafka broker went down. The broker came back up but the streams applications have stopped.

Following is the exception of first streams app:

Exception in thread "streams-collection-7cda47bc-a1db-4ad5-a3d4-bd8f8dc85bf4-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=o365_activity_contenturl, partition=0, offset=2151
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key null value {"RecordType":6,"ListId":"affd3b1e-5d16-4e36-b97a-871b755b2b40","Version":1,"SourceFileName":"9617","ClientIP":"94.245.89.59","Workload":"OneDrive","UserType":0} timestamp 1527845926991) to topic o365_user_activity due to org.apache.kafka.common.errors.TimeoutException: Expiring 15 record(s) for topic1-0: 32551 ms has passed since batch creation plus linger time.
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
    at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
    at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 15 record(s) for topic1-0: 32551 ms has passed since batch creation plus linger time

Exception of second streams app:

Exception in thread "streams-distribution-bf0d8698-f198-4d91-ad66-f0833b4ef398-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key null value {"item_type":"File","workload":"OneDrive","current_date":"2018-06-01","client_ip":"94.245.89.59"} timestamp 1527845926986) to topic topic3 due to org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for topic3-0: 34706 ms has passed since last attempt plus backoff time.
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
    at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
    at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for topic3-0: 34706 ms has passed since last attempt plus backoff time

Why does streams application fails to recover?

UPDATE 1:

After updating from Kafka 1.0.0 to 1.1.0, I have an additional information in the log:

You can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.

Streams app still doesn't continue processing after I stop the broker and restart it.

UPDATE 2:

However when I restart streams app itself, after stopping and starting the kafka broker, it starts consuming.

Configs:

props.put(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG, 100000);
props.put(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 200000);
props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 60000);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");