Save Spark org.apache.spark.mllib.linalg.Matrix to

2020-02-29 10:20发布

问题:

The result of correlation in Spark MLLib is a of type org.apache.spark.mllib.linalg.Matrix. (see http://spark.apache.org/docs/1.2.1/mllib-statistics.html#correlations)

val data: RDD[Vector] = ... 

val correlMatrix: Matrix = Statistics.corr(data, "pearson")

I would like to save the result into a file. How can I do this?

回答1:

Here is a simple and effective approach to save the Matrix to hdfs and specify the separator.

(The transpose is used since .toArray is in column major format.)

val localMatrix: List[Array[Double]] = correlMatrix
    .transpose  // Transpose since .toArray is column major
    .toArray
    .grouped(correlMatrix.numCols)
    .toList

val lines: List[String] = localMatrix
    .map(line => line.mkString(" "))

sc.parallelize(lines)
    .repartition(1)
    .saveAsTextFile("hdfs:///home/user/spark/correlMatrix.txt")


回答2:

As Matrix is Serializable, you can write it using normal Scala.

You can find an example here.



回答3:

The answer by Dylan Hogg was great, to enhance it slightly, add a column index. (In my use case, once I created a file and downloaded it, it was not sorted due to the nature of parallel process etc.)

ref: https://www.safaribooksonline.com/library/view/scala-cookbook/9781449340292/ch10s12.html

substitute with this line and it will put a sequence number on the line (starting w/ 0) making it easier to sort when you go to view it

val lines: List[String] = localMatrix 
  .map(line => line.mkString(" ")) 
  .zipWithIndex.map { case(line, count) => s"$count $line" } 


回答4:

Thank you for your suggestion. I came out with this solution. Thanks to Ignacio for his suggestions

val vtsd = sd.map(x => Vectors.dense(x.toArray))
val corrMat = Statistics.corr(vtsd)
val arrayCor = corrMat.toArray.toList
val colLen = columnHeader.size
val toArr2 = sc.parallelize(arrayCor).zipWithIndex().map(
      x => {
    if ((x._2 + 1) % colLen == 0) {
      (x._2, arrayCor.slice(x._2.toInt + 1 - colLen, x._2.toInt + 1).mkString(";"))
    } else {
      (x._2, "")
    }
  }).filter(_._2.nonEmpty).sortBy(x => x._1, true, 1).map(x => x._2)


toArr2.coalesce(1, true).saveAsTextFile("/home/user/spark/cor_" + System.currentTimeMillis())