I'm using Spark 2.2.0 and kafka 0.10 spark-streaming library to read from topic filled with Kafka-Streams scala application. Kafka Broker version is 0.11 and Kafka-streams version is 0.11.0.2.
When i set EXACTLY_ONCE guarantee in Kafka-Stream app:
p.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE)
i get this error in Spark:
java.lang.AssertionError: assertion failed: Got wrong record for spark-executor-<group.id> <topic> 0 even after seeking to offset 24
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:85)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.to(KafkaRDD.scala:189)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.toBuffer(KafkaRDD.scala:189)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.toArray(KafkaRDD.scala:189)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
If EXACTLY_ONCE property is not set, it works just fine.
EDIT 1: Topic filled with kafka-streams app(exactly once enabled) has wrong ending offset. When i run kafka.tools.GetOffsetShell, it gives ending offset 18, but in topic there are just 12 messages (retention is disabled). When exactly once guarantee is disabled, these offsets are matching. I tried to reset kafka-streams according to this, but problem still remains.
EDIT 2: When i run SimpleConsumerShell with --print-offsets option, output is folowing:
next offset = 1
{"timestamp": 149583551238149, "data": {...}}
next offset = 2
{"timestamp": 149583551238149, "data": {...}}
next offset = 4
{"timestamp": 149583551238149, "data": {...}}
next offset = 5
{"timestamp": 149583551238149, "data": {...}}
next offset = 7
{"timestamp": 149583551238149, "data": {...}}
next offset = 8
{"timestamp": 149583551238149, "data": {...}}
...
Some offsets are apparently skipped when exactly-once dellivery guarantee is enabled.
Any thoughts? What can cause this? Thanks!