Spark Streaming: HDFS

2019-02-07 11:44发布

问题:

  1. I can't get my Spark job to stream "old" files from HDFS.

If my Spark job is down for some reason (e.g. demo, deployment) but the writing/moving to HDFS directory is continuous, I might skip those files once I up the Spark Streaming Job.

    val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/logs")

    hdfsDStream.foreachRDD(
      rdd => logInfo("Number of records in this batch: " + rdd.count())
    )

Output --> Number of records in this batch: 0

  1. Is there a way for Spark Streaming to move the "read" files to a different folder? Or we have to program it manually? So it will avoid reading already "read" files.

  2. Is Spark Streaming the same as running the spark job (sc.textFile) in CRON?

回答1:

As Dean mentioned, textFileStream uses the default of only using new files.

  def textFileStream(directory: String): DStream[String] = {
    fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
  }

So, all it is doing is calling this variant of fileStream

def fileStream[
    K: ClassTag,
    V: ClassTag,
    F <: NewInputFormat[K, V]: ClassTag
  ] (directory: String): InputDStream[(K, V)] = {
    new FileInputDStream[K, V, F](this, directory)
  }

And, looking at the FileInputDStream class we will see that it indeed can look for existing files, but defaults to new only:

newFilesOnly: Boolean = true,

So, going back into the StreamingContext code, we can see that there is and overload we can use by directly calling the fileStream method:

def fileStream[
 K: ClassTag,
 V: ClassTag,
 F <: NewInputFormat[K, V]: ClassTag] 
(directory: String, filter: Path => Boolean, newFilesOnly: Boolean):InputDStream[(K, V)] = {
  new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
}

So, the TL;DR; is

ssc.fileStream[LongWritable, Text, TextInputFormat]
    (directory, FileInputDStream.defaultFilter, false).map(_._2.toString)


回答2:

Are you expecting Spark to read files already in the directory? If so, this is a common misconception, one that took me by surprise. textFileStream watches a directory for new files to appear, then it reads them. It ignores files already in the directory when you start or files it's already read.

The rationale is that you'll have some process writing files to HDFS, then you'll want Spark to read them. Note that these files much appear atomically, e.g., they were slowly written somewhere else, then moved to the watched directory. This is because HDFS doesn't properly handle reading and writing a file simultaneously.



回答3:

val filterF = new Function[Path, Boolean] {
    def apply(x: Path): Boolean = {
      println("looking if "+x+" to be consider or not")
      val flag = if(x.toString.split("/").last.split("_").last.toLong < System.currentTimeMillis){ println("considered "+x); list += x.toString; true}
       else{ false }
      return flag
    }
}

this filter function is used to determine whether each path is actually the one preferred by you. so the function inside the apply should be customized as per your requirement.

val streamed_rdd = ssc.fileStream[LongWritable, Text, TextInputFormat]("/user/hdpprod/temp/spark_streaming_output",filterF,false).map{case (x, y) => (y.toString)}

now you have to set the third variable of filestream function to false, this is to make sure not only new files but also consider old existing files in the streaming directory.