I am using sink to save Spark(2.3) Structured Streaming DataFrame into Hive table with a custom sink implementation.
The code is as follows.
val df = spark.readStream.format("socket").option("host", "localhost").option("port", 19191).load().as[String]
val query = df.map { s => val records = s.split(",") assert(records.length >= 4)
(records(0).toInt, records(1), records(2), records(3))
}
query.selectExpr("_1 as eid", "_2 as name", "_3 as salary", "_4 as designation").
writeStream.
format("hive-streaming").
option("metastore", ".....").
option("db", "test").
option("table", "test_employee").
option("checkpointLocation", "/checkpoints/employee/checkpoint").
queryName("socket-hive-streaming").
start()
This would lead to following runtime error.
ERROR streaming.MicroBatchExecution: Query socket-hive-streaming [id = ......, runId = ......] terminated with error
java.lang.RuntimeException: Offsets committed out of order: 1 followed by 0
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.streaming.TextSocketSource.commit(socket.scala:146)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$4.apply(MicroBatchExecution.scala:356)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$4.apply(MicroBatchExecution.scala:355)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:355)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:338)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:128)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
There are two ways to solve your problem:
Delete/clear your checkpoint:
/checkpoints/employee/checkpoint
on your machineUsing the other source that maintains offset, like Kafka
The reason you meet this issue is that the socket doesn't maintain offset information.
When you restart your job which receives input data from
socket 9999
, the first thing that your job does is trying to recover the state from/checkpoints/employee/checkpoint
, and it finds that your current offset that has been recorded is1
. Then you input other messages tosocket 9999
, your job finds that the offset fromsocket 9999
is0
. So it throws this exception.