I'm facing an issue with high level kafka consumer (0.8.2.0) - after consuming some amount of data one of our consumers stops. After restart it consumes some messages and stops again with no error/exception or warning.
After some investigation I found that the problem with consumer was this exception:
ERROR c.u.u.e.impl.kafka.KafkaConsumer - Error consuming message stream:
kafka.message.InvalidMessageException: Message is corrupt (stored crc = 3801080313, computed crc = 2728178222)
Any ideas how can I simple skip such messages at all?
So, answering my own question. After some debugging of Kafka Consumer, I found one possible solution:
- Create a subclass of
kafka.consumer.ConsumerIterator
- Override
makeNext
-method. In this method catch InvalidMessageException
and return some dummy-placeholder.
- In your
while
-loop you have to convert the kafka.consumer.ConsumerIterator
to your implementation. Unfortunately all fields of kafka.consumer.ConsumerIterator
are private, so you have to use reflection.
So this is the code example:
val skipIt = createKafkaSkippingIterator(ks.iterator())
while(skipIt.hasNext()) {
val messageAndTopic = skipIt.next()
if (messageNotCorrupt(messageAndTopic)) {
consumeFn(messageAndTopic)
}
}
The messageNotCorrupt
-method simply checks if the argument is equal to the dummy-message.
another solution, possibly easier, using Kafka 0.8.2 client.
try {
val m = it.next()
//...
} catch {
case e: kafka.message.InvalidMessageException ⇒
log.warn("Corrupted message. Skipping.", e)
resetIteratorState(it)
}
//...
def resetIteratorState(it: ConsumerIterator[Array[Byte], Array[Byte]]): Unit = {
val method = classOf[IteratorTemplate[_]].getDeclaredMethod("resetState")
method.setAccessible(true)
method.invoke(it)
}