I have a 2 Kafka Streams application. One application listens from say topic1
and produces to topic2
and the other listens from topic2
and produces to topic3
. The applications were working fine before the kafka broker went down. The broker came back up but the streams applications have stopped.
Following is the exception of first streams app:
Exception in thread "streams-collection-7cda47bc-a1db-4ad5-a3d4-bd8f8dc85bf4-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=o365_activity_contenturl, partition=0, offset=2151
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key null value {"RecordType":6,"ListId":"affd3b1e-5d16-4e36-b97a-871b755b2b40","Version":1,"SourceFileName":"9617","ClientIP":"94.245.89.59","Workload":"OneDrive","UserType":0} timestamp 1527845926991) to topic o365_user_activity due to org.apache.kafka.common.errors.TimeoutException: Expiring 15 record(s) for topic1-0: 32551 ms has passed since batch creation plus linger time.
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 15 record(s) for topic1-0: 32551 ms has passed since batch creation plus linger time
Exception of second streams app:
Exception in thread "streams-distribution-bf0d8698-f198-4d91-ad66-f0833b4ef398-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key null value {"item_type":"File","workload":"OneDrive","current_date":"2018-06-01","client_ip":"94.245.89.59"} timestamp 1527845926986) to topic topic3 due to org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for topic3-0: 34706 ms has passed since last attempt plus backoff time.
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for topic3-0: 34706 ms has passed since last attempt plus backoff time
Why does streams application fails to recover?
UPDATE 1:
After updating from Kafka 1.0.0
to 1.1.0
, I have an additional information in the log:
You can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.
Streams app still doesn't continue processing after I stop the broker and restart it.
UPDATE 2:
However when I restart streams app itself, after stopping and starting the kafka broker, it starts consuming.
Configs:
props.put(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG, 100000);
props.put(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 200000);
props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 60000);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");