Our Spark environment: DataBricks 4.2 (includes Apache Spark 2.3.1, Scala 2.11)
What we try to achieve: We want to enrich streaming data with some reference data, which is updated regularly. The enrichment is done by joining the stream with the reference data.
What we implemented: We implemented two spark jobs (jars): The first one is updating a Spark table TEST_TABLE every hour (let’s call it ‘reference data’) by using .write.mode(SaveMode.Overwrite).saveAsTable("TEST_TABLE") And afterwards calling spark.catalog.refreshTable("TEST_TABLE")
The second job (let’s call it streaming data) is using Spark Structured Streaming to stream reading some data, joining it using DataFrame.transform() with table TEST_TABLE and writing it to another system. We are reading the reference data using spark.read.table(“TEST_TABLE”) in the function called by .transform() so we get the latest values in the table. Unfortunately, the second app crashes every time the first app updates the table. The following message is shown in Log4j output:
18/08/23 10:34:40 WARN TaskSetManager: Lost task 0.0 in stage 547.0 (TID 5599, 10.139.64.9, executor 0): java.io.FileNotFoundException: dbfs:/user/hive/warehouse/code.db/TEST_TABLE/ part-00000-tid-5184425276562097398-25a0e542-41e4-416f-bae8-469899a72c21-36-c000.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readFile(FileScanRDD.scala:203)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$createNextIterator(FileScanRDD.scala:377)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:295)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:291)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748
We also tried to invalidate the cache before we read the table but that decreased the performance and the app crashed nevertheless. We suspect the root course is the lazy evaluation of the reference dataset (which still ‘points’ to the old data, which is not present anymore).
Do you have any suggestions what we could do to prevent this problem or what the best approach to join a stream with dynamic reference data is?
Join to the reference data; do not cache it, this ensures you go to source. Look for latest version data which is signified by a primary key + a counter, where this counter closest to or equal to a counter you maintain in Streaming application. Every hour write, append all the ref data still current, again but with incremented counter; i.e. a new version. Use parquet here.
Instead of joining the table and stream. You can take advantage of a new feature available in spark 2.3.1 i.e joining of two streams data. Create a stream instead of a table with the watermark.
Refer databricks blog