How can I save an RDD into HDFS and later read it

2019-03-15 13:39发布

I have an RDD whose elements are of type (Long, String). For some reason, I want to save the whole RDD into the HDFS, and later also read that RDD back in a Spark program. Is it possible to do that? And if so, how?

2条回答
乱世女痞
2楼-- · 2019-03-15 13:47

It is possible.

In RDD you have saveAsObjectFile and saveAsTextFile functions. Tuples are stored as (value1, value2), so you can later parse it.

Reading can be done with textFile function from SparkContext and then .map to eliminate ()

So: Version 1:

rdd.saveAsTextFile ("hdfs:///test1/");
// later, in other program
val newRdds = sparkContext.textFile("hdfs:///test1/part-*").map (x => {
    // here remove () and parse long / strings
})

Version 2:

rdd.saveAsObjectFile ("hdfs:///test1/");
// later, in other program - watch, you have tuples out of the box :)
val newRdds = sparkContext.sc.sequenceFile("hdfs:///test1/part-*", classOf[Long], classOf[String])
查看更多
我命由我不由天
3楼-- · 2019-03-15 14:00

I would recommend to use DataFrame if your RDD is in tabular format. a data frame is a table, or two-dimensional array-like structure, in which each column contains measurements on one variable, and each row contains one case. a DataFrame has additional metadata due to its tabular format, which allows Spark to run certain optimizations on the finalized query. where a RDD is a Resilient Distributed Dataset that is more of a blackbox or core abstraction of data that cannot be optimized. However, you can go from a DataFrame to an RDD and vice-versa, and you can go from an RDD to a DataFrame (if the RDD is in a tabular format) via toDF method.

The following is the example to create/store a DataFrame in CSV and Parquet format in HDFS,

val conf = {
   new SparkConf()
     .setAppName("Spark-HDFS-Read-Write")
 }

 val sqlContext = new SQLContext(sc)

 val sc = new SparkContext(conf)

 val hdfs = "hdfs:///"
 val df = Seq((1, "Name1")).toDF("id", "name")

 //  Writing file in CSV format
 df.write.format("com.databricks.spark.csv").mode("overwrite").save(hdfs + "user/hdfs/employee/details.csv")

 // Writing file in PARQUET format
 df.write.format("parquet").mode("overwrite").save(hdfs + "user/hdfs/employee/details")

 //  Reading CSV files from HDFS
 val dfIncsv = sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", "true").load(hdfs + "user/hdfs/employee/details.csv")

 // Reading PQRQUET files from HDFS
 val dfInParquet = sqlContext.read.parquet(hdfs + "user/hdfs/employee/details")
查看更多
登录 后发表回答