I am using Spark Scala to calculate cosine similarity between the Dataframe rows.
Dataframe schema is below:
root
|-- itemId: string (nullable = true)
|-- features: vector (nullable = true)
Sample of the dataframe below
+-------+--------------------+
| itemId| features|
+-------+--------------------+
| ab |[4.7143,0.0,5.785...|
| cd |[5.5,0.0,6.4286,4...|
| ef |[4.7143,1.4286,6....|
........
+-------+--------------------+
Code to compute the cosine similarities:
val irm = new IndexedRowMatrix(myDataframe.rdd.zipWithIndex().map {
case (row, index) => IndexedRow(row.getAs[Vector]("features"), index)
}).toCoordinateMatrix.transpose.toRowMatrix.columnSimilarities
In the irm matrix, I have (i, j, score) where i, j are the indexes of item i, and j of my original dataframe. What I would like is to get (itemIdA, itemIdB, score) where itemIdA and itemIdB are the ids of index i and j respectively, by joining this irm with the initial dataframe or if there is any better option?