Kafka Streams: Proper way to exit on error

2019-04-29 03:17发布

问题:

I've been successful in getting a streams app to consume, transform and produce data, but I've noticed that periodically, the streams processor will transition to a state of ERROR and the process will sit there without exiting.

Showing me logs like:

All stream threads have died. The instance will be in error state and should be closed.

Is there a way to tell the Streams app to exit once its reached the ERROR state? Maybe a monitor thread of sorts?

I see references in the comments of the Kafka Streams code to the user needing to close the application once its reached this state, however, I haven't been able to find mention of this task in the documentation.

Is there a simple way to do this shutdown step?


probably incorrect way to potentially close on errors

My intention was to set the UncaughtExceptionHandler method on the KafkaStreams object, to do the following:

  • log the error
  • shutdown the stream using the close method on the original KafkaStreams object

What results from that is:

  • message for the exception is logged
  • INFO org.apache.kafka.streams.KafkaStreams ... State transition from ERROR to PENDING_SHUTDOWN
  • INFO org.apache.kafka.streams.processor.internals.StreamThread ... Informed to shut down

And then, unfortunately the process seems to hang without exiting.

FWIW I feel like this is probably a misuse of the setUncaughtExceptionHandler

回答1:

Using an UncaughtExceptionHandler is correct. However, if you call KafkaStreams#close() within the handler call-back you can run into a deadlock. Thus, you should either only set a flag, and call #close() outside of the callback, or you use close() with a timeout. If the timeout expires, a shutdown is forced.