Why does using cache on streaming Datasets fail wi

2020-02-05 03:30发布

问题:

SparkSession
  .builder
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "C:/tmp/spark")
  .config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint")
  .appName("my-test")
  .getOrCreate
  .readStream
  .schema(schema)
  .json("src/test/data")
  .cache
  .writeStream
  .start
  .awaitTermination

While executing this sample in Spark 2.1.0 I got error. Without the .cache option it worked as intended but with .cache option i got:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/test/data]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:196)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:58)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:102)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:65)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:89)
at org.apache.spark.sql.Dataset.persist(Dataset.scala:2479)
at org.apache.spark.sql.Dataset.cache(Dataset.scala:2489)
at org.me.App$.main(App.scala:23)
at org.me.App.main(App.scala)

Any idea?

回答1:

Your (very interesting) case boils down to the following line (that you can execute in spark-shell):

scala> :type spark
org.apache.spark.sql.SparkSession

scala> spark.readStream.text("files").cache
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[files]
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
  at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
  at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:104)
  at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:68)
  at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:92)
  at org.apache.spark.sql.Dataset.persist(Dataset.scala:2603)
  at org.apache.spark.sql.Dataset.cache(Dataset.scala:2613)
  ... 48 elided

The reason for this turned out quite simple to explain (no pun to Spark SQL's explain intended).

spark.readStream.text("files") creates a so-called streaming Dataset.

scala> val files = spark.readStream.text("files")
files: org.apache.spark.sql.DataFrame = [value: string]

scala> files.isStreaming
res2: Boolean = true

Streaming Datasets are the foundation of Spark SQL's Structured Streaming.

As you may have read in Structured Streaming's Quick Example:

And then start the streaming computation using start().

Quoting the scaladoc of DataStreamWriter's start:

start(): StreamingQuery Starts the execution of the streaming query, which will continually output results to the given path as new data arrives.

So, you have to use start (or foreach) to start the execution of the streaming query. You knew it already.

But...there are Unsupported Operations in Structured Streaming:

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset.

If you try any of these operations, you will see an AnalysisException like "operation XYZ is not supported with streaming DataFrames/Datasets".

That looks familiar, doesn't it?

cache is not in the list of the unsupported operations, but that's because it has simply been overlooked (I reported SPARK-20927 to fix it).

cache should have been in the list as it does execute a query before the query gets registered in Spark SQL's CacheManager.

Let's go deeper into the depths of Spark SQL...hold your breath...

cache is persist while persist requests the current CacheManager to cache the query:

sparkSession.sharedState.cacheManager.cacheQuery(this)

While caching a query CacheManager does execute it:

sparkSession.sessionState.executePlan(planToCache).executedPlan

which we know is not allowed since it is start (or foreach) to do so.

Problem solved!