Apache Kafka with High Level Consumer: Skip corrup

2019-04-02 02:26发布

问题:

I'm facing an issue with high level kafka consumer (0.8.2.0) - after consuming some amount of data one of our consumers stops. After restart it consumes some messages and stops again with no error/exception or warning.

After some investigation I found that the problem with consumer was this exception:

ERROR c.u.u.e.impl.kafka.KafkaConsumer  - Error consuming message stream:
 kafka.message.InvalidMessageException: Message is corrupt (stored crc = 3801080313, computed crc = 2728178222)

Any ideas how can I simple skip such messages at all?

回答1:

So, answering my own question. After some debugging of Kafka Consumer, I found one possible solution:

  1. Create a subclass of kafka.consumer.ConsumerIterator
  2. Override makeNext-method. In this method catch InvalidMessageException and return some dummy-placeholder.
  3. In your while-loop you have to convert the kafka.consumer.ConsumerIterator to your implementation. Unfortunately all fields of kafka.consumer.ConsumerIterator are private, so you have to use reflection.

So this is the code example:

val skipIt = createKafkaSkippingIterator(ks.iterator())

while(skipIt.hasNext()) {
  val messageAndTopic = skipIt.next()

  if (messageNotCorrupt(messageAndTopic)) {
    consumeFn(messageAndTopic)
  }
}

The messageNotCorrupt-method simply checks if the argument is equal to the dummy-message.



回答2:

another solution, possibly easier, using Kafka 0.8.2 client.

try {
  val m = it.next()
  //...
} catch {
  case e: kafka.message.InvalidMessageException ⇒
    log.warn("Corrupted message. Skipping.", e)
    resetIteratorState(it)
}

//...

def resetIteratorState(it: ConsumerIterator[Array[Byte], Array[Byte]]): Unit = {
  val method = classOf[IteratorTemplate[_]].getDeclaredMethod("resetState")
  method.setAccessible(true)
  method.invoke(it)
}