I have a stream app with below driver code for real-time message transformation.
String topicName = ...
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(topicName);
source.transform(() -> new MyTransformer()).to(...);
KafkaStreams streams = new KafkaStreams(builder, appConfig);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
logger.error("UncaughtExceptionHandler " + e.getMessage());
System.exit(0);
}
});
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
After few minutes of execution, app throws the below exception and then not progressing through the stream.
[2017-02-22 14:24:35,139] ERROR [StreamThread-14] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group TRANSFORMATION-APP failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
org.apache.kafka.streams.errors.ProcessorStateException: task [0_11] Error while creating the state manager
at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:72)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:89)
at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)
at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: java.io.IOException: task [0_11] Failed to lock the state directory: /tmp/kafka-streams/TRANSFORMATION-APP/0_11
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:101)
at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
... 13 more
I tried to flush out the /tmp/kafka-streams/TRANSFORMATION-APP
directory and restarted the app but again throws the same exception. One thing I noticed was that app works fine until it transforms all backlog messages but throws exception after processing some of the new messages!
Sometimes it also throws the below uncaught exceptions.
[ERROR] 2017-02-22 12:40:54.804 [StreamThread-29] MyTransformer - UncaughtExceptionHandler task directory [/tmp/kafka-streams/TRANSFORMATION-APP/0_24] doesn't exist and couldn't be created
[ERROR] 2017-02-22 12:42:30.148 [StreamThread-179] MyTransformer - UncaughtExceptionHandler stream-thread [StreamThread-179] Failed
to rebalance
After throwing (one of) these exceptions, app is still running but not progressing through the stream.
What is the correct way to handle these errors?. Is it possible to restart the stream programmatically, without killing the app? This app is under monit. At at worst case, I would prefer to terminate the app properly (without any message loss), so that monit can restart it.
The input topic has 100 partitions and I have set num.stream.threads
to 100 in the app configuration. The app is on Kafka 0.10.1.1-cp1.