Spark Streaming textFileStream watch output of RDD

2019-09-20 23:00发布

问题:

Running Spark 1.6.2 (YARN mode)

Firstly, I have some code from this post to get filenames within Spark Streaming, so that could be the issue, but hopefully not.

Basically, I have this first job.

import org.apache.spark.SparkContext
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

def getStream(ssc: StreamingContext, dir: String): DStream[String] = {
    ssc.fileStream[LongWritable, Text, TextInputFormat](dir)
}

val sc = SparkContext.getOrCreate
val ssc = new StreamingContext(sc, Seconds(5))

val inputDir = "hdfs:///tmp/input"
val outputDir = "hdfs:///tmp/output1"

val stream1 = getStream(ssc, inputDir)
stream1.foreachRDD(rdd => rdd.saveAsTextFile(outputDir))

ssc.start()
ssc.awaitTermination()

And I also have a second job that, for this example, looks practically identical, just change around inputDir and outputDir, and move to a new outputDir = "hdfs:///tmp/output2".

Anyway, so I have to start the second streaming job before the first job because it needs to watch for new files. Makes sense...

Then, I start the first job and hadoop fs -copyFromLocal some files into the input folder since per the API

Files must be written to the monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored.

When I try to run this, it eventually crashes with a stacktrace that contains this

17/02/01 11:48:35 INFO FileInputDStream: Finding new files took 7 ms
17/02/01 11:48:35 INFO FileInputDStream: New files at time 1485949715000 ms:
hdfs://sandbox.hortonworks.com:8020/tmp/output1/_SUCCESS
17/02/01 11:48:35 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 355.9 KB, free 356.8 KB)
17/02/01 11:48:35 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 28.9 KB, free 385.7 KB)
17/02/01 11:48:35 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:43097 (size: 28.9 KB, free: 511.1 MB)
17/02/01 11:48:35 INFO SparkContext: Created broadcast 1 from fileStream at FileStreamTransformer.scala:45
17/02/01 11:48:35 ERROR JobScheduler: Error generating jobs for time 1485949715000 ms
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://sandbox.hortonworks.com:8020/output1/_SUCCESS
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:323)
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:387)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240)
  at scala.Option.getOrElse(Option.scala:120)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:240)
  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:276)
  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:266)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:266)
  at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:153)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
  at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:253)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:251)
  at scala.util.Try$.apply(Try.scala:161)
  at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:251)
  at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://sandbox.hortonworks.com:8020/tmp/output1/_SUCCESS
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:323)
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:387)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240)
  at scala.Option.getOrElse(Option.scala:120)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:240)
  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:276)
  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:266)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:266)
  at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:153)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
  at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:253)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:251)
  at scala.util.Try$.apply(Try.scala:161)
  at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:251)
  at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
17/02/01 11:48:35 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
17/02/01 11:48:35 INFO JobGenerator: Stopping JobGenerator immediately
17/02/01 11:48:35 INFO RecurringTimer: Stopped timer for JobGenerator after time 1485949715000
17/02/01 11:48:35 INFO JobGenerator: Stopped JobGenerator
17/02/01 11:48:35 INFO JobScheduler: Stopped JobScheduler
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/streaming,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/streaming/batch,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/static/streaming,null}
17/02/01 11:48:35 INFO StreamingContext: StreamingContext stopped successfully
17/02/01 11:48:35 INFO SparkContext: Invoking stop() from shutdown hook
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/streaming/batch/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/streaming/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
17/02/01 11:48:35 INFO SparkUI: Stopped Spark web UI at http://172.17.0.2:4040
17/02/01 11:48:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/02/01 11:48:35 INFO MemoryStore: MemoryStore cleared
17/02/01 11:48:35 INFO BlockManager: BlockManager stopped
17/02/01 11:48:35 INFO BlockManagerMaster: BlockManagerMaster stopped
17/02/01 11:48:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/02/01 11:48:35 INFO SparkContext: Successfully stopped SparkContext
17/02/01 11:48:35 INFO ShutdownHookManager: Shutdown hook called
17/02/01 11:48:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-85bb28ad-e3e1-4b2a-8795-04ac1c6a0ea5
17/02/01 11:48:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-85bb28ad-e3e1-4b2a-8795-04ac1c6a0ea5/httpd-65e6e9f0-dcb8-4b66-86f6-f775e2e497c0
17/02/01 11:48:35 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
17/02/01 11:48:35 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
17/02/01 11:48:35 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.

And I know that _SUCCESS file is being written by rdd.saveAsTextFile, so that's not the issue, but my issues are as follows:

  1. The file does exist. Can see it using hadoop fs -ls
  2. Even if the file didn't exist, the API is designed to pickup new files. Why is that being read?
  3. That file is empty, so why should it be processed anyway?
  4. Is this even possible? Can Spark Streaming watch the output of another Spark job?

回答1:

to explicitly enforce that only new files are processed and to ensure touch files liek _SUCCESS are skipped we can use the below signature of fileStream

def getStream(ssc: StreamingContext, dir: String): DStream[String] = {
   ssc.fileStream[LongWritable, Text, TextInputFormat](dir,
      (path: org.apache.hadoop.fs.Path) => (!path.getName.startsWith("_")) || (!path.getName().startsWith(".")),
      newFilesOnly = true)
}

The newFileOnly defaults to true when not specified as shown here. So ideally _SUCCESS should not have been processed in your setup too.