Why does foreachRDD not populate DataFrame with ne

2019-08-22 15:45发布

问题:

My problem is that, as I change my code into streaming mode and put my data frame into the foreach loop, the data frame shows empty table! I does't fill! I also can not put it into assembler.transform(). The error is:

Error:(38, 40) not enough arguments for method map: (mapFunc: String => U)(implicit evidence$2: scala.reflect.ClassTag[U])org.apache.spark.streaming.dstream.DStream[U].
Unspecified value parameter mapFunc.
      val dataFrame = Train_DStream.map()

My train.csv file is like below: Please help me. Here is my code:

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.{LabeledPoint, StreamingLinearRegressionWithSGD}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.util.Try

/**
  * Created by saeedtkh on 5/22/17.
  */
object ML_Test {
  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
    val sc = new SparkContext(sparkConf)
    // Create the context
    val ssc = new StreamingContext(sc, Seconds(10))
    val sqlContext = new SQLContext(sc)

    val customSchema = StructType(Array(
      StructField("column0", StringType, true),
      StructField("column1", StringType, true),
      StructField("column2", StringType, true)))

      //val Test_DStream = ssc.textFileStream("/Users/saeedtkh/Desktop/sharedsaeed/train.csv").map(LabeledPoint.parse)
      val Train_DStream = ssc.textFileStream("/Users/saeedtkh/Desktop/sharedsaeed/train.csv")
      val DStream =Train_DStream.map(line => line.split(">")).map(array => {
      val first = Try(array(0).trim.split(" ")(0)) getOrElse ""
      val second = Try(array(1).trim.split(" ")(6)) getOrElse ""
      val third = Try(array(2).trim.split(" ")(0).replace(":", "")) getOrElse ""
      Row.fromSeq(Seq(first, second, third))
    })

      DStream.foreachRDD { Test_DStream =>
      val dataFrame = sqlContext.createDataFrame(Test_DStream, customSchema)
      dataFrame.groupBy("column1", "column2").count().show()

      val numFeatures = 3
      val model = new StreamingLinearRegressionWithSGD()
          .setInitialWeights(Vectors.zeros(numFeatures))

      val featureCol = Array("column1", "column2")
      val assembler=new VectorAssembler().setInputCols(featureCol).setOutputCol("features")
      dataFrame.show()
      val df_new=assembler.transform(dataFrame)

    }

    ssc.start()
    ssc.awaitTermination()
  }
}

回答1:

My guess is that all the files under /Users/saeedtkh/Desktop/sharedsaeed/train.csv directory have already been processed and so there are no files left and hence the DataFrame is empty.

Please note that the sole input parameter for StreamingContext.textFileStream is a directory not a file.

textFileStream(directory: String): DStream[String] Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them as text files

Please also note that once a file has ever been processed in a Spark Streaming application, this file should not be changed (or appended to) since the file has already been marked as processed and Spark Streaming will ignore any modifications.

Quoting the official documentation of Spark Streaming in Basic Sources:

Spark Streaming will monitor the directory dataDirectory and process any files created in that directory (files written in nested directories not supported). Note that

  • The files must have the same data format.

  • The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.

  • Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.

For simple text files, there is an easier method streamingContext.textFileStream(dataDirectory). And file streams do not require running a receiver, hence does not require allocating cores.


Please also replace setMaster("local") with setMaster("local[*]") to make sure your Spark Streaming application will have enough threads to process incoming data (you have to have at least 2 threads).