I am using Kafka Spark Streaming to get streaming data.
val lines = KafkaUtils.createDirectStream[Array[Byte], String, DefaultDecoder, StringDecoder](ssc, kafkaConf, Set(topic)).map(_._2)
I am using this DStream and processing RDDs
val output = lines.foreachRDD(rdd =>
rdd.foreachPartition { partition =>
partition.foreach { file => runConfigParser(file)}
})
runConfigParser
is a JAVA method which parses a file and produces an output which i have to save in HDFS. So multiple nodes will process RDD and write output into one single HDFS file. As i want to load this fie in HIVE.
should I output the result of runConfigParser
and use sc.parallze(output).saveAsTextFile(path)
such that all my nodes will write RDD outputs to single HDFS file.? Is this design efficient ?
I will load this single HDFS file (which will be constantly updated as its streaming data) in HIVE and query using Impala.
No. Because you want one HDFS file, saveAsTextFile
, which creates many HDFS files for RDD partitions, doesn't suffice your requirement.
In order to get one HDFS file, reduce
/collect
the output and call HDFS Java API to create an HDFS file. This method is inefficient in that all outputs need to come to Spark driver program at the last Spark action(s).
You can use a function to "merge" the result of saveAsTextFile
. Like this:
import org.apache.hadoop.fs._
def saveAsTextFileAndMerge[T](hdfsServer: String, fileName: String, rdd: RDD[T]) = {
val sourceFile = hdfsServer + "/tmp/"
rdd.saveAsTextFile(sourceFile)
val dstPath = hdfsServer + "/final/"
merge(sourceFile, dstPath, fileName)
}
def merge(srcPath: String, dstPath: String, fileName: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
val destinationPath = new Path(dstPath)
if (!hdfs.exists(destinationPath)) {
hdfs.mkdirs(destinationPath)
}
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, hadoopConfig, null)
}