Structured streaming won't write DF to file si

2019-04-24 00:33发布

I'm building a Kafka ingest module in EMR 5.11.1, Spark 2.2.1. My intention is to use Structured Streaming to consume from a Kafka topic, do some processing, and store to EMRFS/S3 in parquet format.

Console sink works as expected, file sink does not work.

In spark-shell:

val event = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", <server list>)
.option("subscribe", <topic>)
.load()

val eventdf = event.select($"value" cast "string" as "json")
.select(from_json($"json", readSchema) as "data")
.select("data.*")

val outputdf = <some processing on eventdf>

This works:

val console_query = outputdf.writeStream.format("console")
.trigger(Trigger.ProcessingTime(10.seconds))
.outputMode(OutputMode.Append)
.start 

This doesn't:

val filesink_query = outputdf.writeStream
.partitionBy(<some column>)
.format("parquet")
.option("path", <some path in EMRFS>)
.option("checkpointLocation", "/tmp/ingestcheckpoint")
.trigger(Trigger.ProcessingTime(10.seconds))
.outputMode(OutputMode.Append)
.start //fails

Things I tried that didn't work:

  1. sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
  2. Change format to CSV instead of parquet
  3. Change output mode to Complete (only append is supported)
  4. Different Trigger intervals
  5. .option("failOnDataLoss", false) on readStream

Some digging into the source code took me here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala where it says that absence of the .compact file should trigger defaults.

Hence tried: spark.conf.set("spark.sql.streaming.fileSink.log.cleanupDelay", 60000) to make sure that old batch's metadata isn't being removed before the new batch creates a combined metadata file

What makes this error annoying is that it isn't always reproducible. Without changing a single character in the code, writing to parquet will sometimes work, or won't. I have tried cleaning checkpoint locations, spark/hdfs logs etc in case the "state" of spark internals was causing this problem.

Here's the error stacktrace:

query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@56122c1

18/04/09 20:20:04 ERROR FileFormatWriter: Aborting job null.
java.lang.IllegalStateException: history/1523305060336/_spark_metadata/9.compact doesn't exist when compacting batch 19 (compactInterval: 10)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:173)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:172)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(CompactibleFileStreamLog.scala:172)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(CompactibleFileStreamLog.scala:156)
        at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:64)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:207)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
        at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:123)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:666)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:666)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:666)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:665)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:306)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
18/04/09 20:20:04 ERROR StreamExecution: Query [id = 5251fe93-2b6b-4dff-bec3-7801dc7e6417, runId = 083547c1-69b7-40e7-8bf9-3c3af11d4c31] terminated with error
org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:213)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
        at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:123)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:666)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:666)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:666)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:665)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:306)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
Caused by: java.lang.IllegalStateException: history/1523305060336/_spark_metadata/9.compact doesn't exist when compacting batch 19 (compactInterval: 10)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:173)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:172)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(CompactibleFileStreamLog.scala:172)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(CompactibleFileStreamLog.scala:156)
        at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:64)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:207)
        ... 20 more

2条回答
我想做一个坏孩纸
2楼-- · 2019-04-24 01:14

It turns out that S3 does not support the read-after-write semantics needed by Spark checkpointing.

This article suggests using AWS EFS for checkpointing.

S3 remains a good place to ingest data from, or egest data to.

查看更多
叼着烟拽天下
3楼-- · 2019-04-24 01:28

I solved this question by clearing my checkpoint path:

  1. remove your checkpoint path:

    sudo -u hdfs hdfs dfs -rmr ${your_checkpoint_path}

  2. resubmit your spark job.

查看更多
登录 后发表回答