Process Spark Streaming rdd and store to single HD

2020-03-06 07:52发布

问题:

  1. I am using Kafka Spark Streaming to get streaming data.

    val lines = KafkaUtils.createDirectStream[Array[Byte], String, DefaultDecoder, StringDecoder](ssc, kafkaConf, Set(topic)).map(_._2)
    
  2. I am using this DStream and processing RDDs

    val output = lines.foreachRDD(rdd => 
            rdd.foreachPartition { partition => 
                partition.foreach { file => runConfigParser(file)}
    })
    
  3. runConfigParser is a JAVA method which parses a file and produces an output which i have to save in HDFS. So multiple nodes will process RDD and write output into one single HDFS file. As i want to load this fie in HIVE.

should I output the result of runConfigParser and use sc.parallze(output).saveAsTextFile(path) such that all my nodes will write RDD outputs to single HDFS file.? Is this design efficient ?

I will load this single HDFS file (which will be constantly updated as its streaming data) in HIVE and query using Impala.

回答1:

No. Because you want one HDFS file, saveAsTextFile, which creates many HDFS files for RDD partitions, doesn't suffice your requirement.

In order to get one HDFS file, reduce/collect the output and call HDFS Java API to create an HDFS file. This method is inefficient in that all outputs need to come to Spark driver program at the last Spark action(s).



回答2:

You can use a function to "merge" the result of saveAsTextFile. Like this:

import org.apache.hadoop.fs._

def saveAsTextFileAndMerge[T](hdfsServer: String, fileName: String, rdd: RDD[T]) = {
    val sourceFile = hdfsServer + "/tmp/" 
    rdd.saveAsTextFile(sourceFile)
    val dstPath = hdfsServer + "/final/" 
    merge(sourceFile, dstPath, fileName)
  }

  def merge(srcPath: String, dstPath: String, fileName: String): Unit = {
    val hadoopConfig = new Configuration()
    val hdfs = FileSystem.get(hadoopConfig)
    val destinationPath = new Path(dstPath)
    if (!hdfs.exists(destinationPath)) {
      hdfs.mkdirs(destinationPath)
    }
    FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, hadoopConfig, null)
  }