Spark Scala - How to group dataframe rows and appl

2020-02-24 04:31发布

i am trying to solve this super simple problem and i am already sick of it, I hope somebody can help my out with this. I have a dataframe of shape like this:

---------------------------
|  Category  | Product_ID |
|------------+------------+
| a          | product 1  |
| a          | product 2  |
| a          | product 3  |
| a          | product 1  |
| a          | product 4  |
| b          | product 5  |
| b          | product 6  |
---------------------------

How do i group these rows by category and apply complicated function in Scala? Maybe something like this:

val result = df.groupBy("Category").apply(myComplexFunction)

This myComplexFunction should produce the following table for each category and upload to pairwise similarities into Hive table or save it into HDFS :


+--------------------------------------------------+
|              | Product_1 | Product_2 | Product_3 |
+------------+------------+------------------------+
| Product_1    | 1.0       | 0.1       |    0.8    |
| Product_2    | 0.1       | 1.0       |    0.5    |
| Product_3    | 0.8       | 0.5       |    1.0    |
+--------------------------------------------------+

Here is the function i want to apply (it is just computing item-item cosine similarity within each category):

def myComplexFunction(context_data : DataFrame, country_name: String,
                               context_id: String, table_name_correlations: String,
                               context_layer: String, context_index: String) : Boolean = {
            val unique_identifier =   country_name + "_" +  context_layer + "_" + context_index
            val temp_table_vocabulary = "temp_vocabulary_" + unique_identifier
            val temp_table_similarities = "temp_similarities_" + unique_identifier
            val temp_table_correlations = "temp_correlations_" + unique_identifier


            //context.count()
            // fit a CountVectorizerModel from the corpus
            //println("Creating sparse incidence matrix")
            val cvModel: CountVectorizerModel = new CountVectorizer().setInputCol("words").setOutputCol("features").fit(context_data)
            val incidence = cvModel.transform(context_data)

            // ========================================================================================
            // create dataframe of mapping from indices into the item id
            //println("Creating vocabulary")
            val vocabulary_rdd = sc.parallelize(cvModel.vocabulary)
            val rows_vocabulary_rdd = vocabulary_rdd.zipWithIndex.map{ case (s,i) => Row(s,i)}
            val vocabulary_field1 = StructField("Product_ID", StringType, true)
            val vocabulary_field2 = StructField("Product_Index", LongType, true)
            val schema_vocabulary = StructType(Seq(vocabulary_field1, vocabulary_field2))
            val df_vocabulary = hiveContext.createDataFrame(rows_vocabulary_rdd, schema_vocabulary)

            // ========================================================================================
            //println("Computing similarity matrix")
            val myvectors = incidence.select("features").rdd.map(r => r(0).asInstanceOf[Vector])
            val mat: RowMatrix = new RowMatrix(myvectors)
            val sims = mat.columnSimilarities(0.0)

            // ========================================================================================
            // Convert records of the Matrix Entry RDD into Rows
            //println("Extracting paired similarities")
            val rowRdd = sims.entries.map{case MatrixEntry(i, j, v) => Row(i, j, v)}

            // ========================================================================================
            // create dataframe schema
            //println("Creating similarity dataframe")
            val field1 = StructField("Product_Index", LongType, true)
            val field2 = StructField("Neighbor_Index", LongType, true)
            var field3 = StructField("Similarity_Score", DoubleType, true)
            val schema_similarities = StructType(Seq(field1, field2, field3))

            // create the dataframe
            val df_similarities = hiveContext.createDataFrame(rowRdd, schema_similarities)

            // ========================================================================================
            //println("Register vocabulary and correlations as spark temp tables")
            df_vocabulary.registerTempTable(temp_table_vocabulary)
            df_similarities.registerTempTable(temp_table_similarities)

            // ========================================================================================
            //println("Extracting Product_ID")
            val temp_corrs = hiveContext.sql(
                s"SELECT T1.Product_ID, T2.Neighbor_ID, T1.Similarity_Score " +
                    s"FROM " +
                    s"(SELECT Product_ID, Neighbor_Index, Similarity_Score " +
                    s"FROM $temp_table_similarities LEFT JOIN  $temp_table_vocabulary " +
                    s"WHERE $temp_table_similarities.Product_Index = $temp_table_vocabulary.Product_Index) AS T1 " +
                    s"LEFT JOIN " +
                    s"(SELECT Product_ID AS Neighbor_ID, Product_Index as Neighbor_Index FROM $temp_table_vocabulary) AS T2 " +
                    s"ON " +
                    s"T1.Neighbor_Index = T2.Neighbor_Index")

            // ========================================================================================
            val context_corrs = temp_corrs.withColumn("Context_Layer", lit(context_layer)).withColumn("Context_ID", lit(context_id)).withColumn("Country", lit(country_name))
            context_corrs.registerTempTable(temp_table_correlations)

            // ========================================================================================
            hiveContext.sql(s"INSERT INTO TABLE $table_name_correlations SELECT * FROM $temp_table_correlations")

            // ========================================================================================
            // clean up environment
            //println("Cleaning up temp tables")
            hiveContext.dropTempTable(temp_table_correlations)
            hiveContext.dropTempTable(temp_table_similarities)
            hiveContext.dropTempTable(temp_table_vocabulary)

            return true
        }

        val partitioned = tokenized.repartition(tokenized("context_id"))
        val context_counts = partitioned.mapPartitions()
        //val context_counts = model_code_ids.zipWithIndex.map{case (model_code_id, context_index) => compute_similarity(tokenized.filter(tokenized("context_id") === model_code_id), country_name, model_code_id.asInstanceOf[String], table_name_correlations, context_layer, context_index.toString)}

    }

I have already tried following:

val category_ids = df.select("Category").distinct.collect()
val result = category_ids.map(category_id => myComplexFunction(df.filter(df("Category") <=> category_id)))

I don't know why but this approach runs sequentially and not in parallel.

1条回答
Luminary・发光体
2楼-- · 2020-02-24 05:05

Cosine similarity is not a complex function and can expressed using standard SQL aggregations. Let's consider following example:

val df = Seq(
  ("feat1", 1.0, "item1"),
  ("feat2", 1.0, "item1"),
  ("feat6", 1.0, "item1"),
  ("feat1", 1.0, "item2"),
  ("feat3", 1.0, "item2"),
  ("feat4", 1.0, "item3"),
  ("feat5", 1.0, "item3"),
  ("feat1", 1.0, "item4"),
  ("feat6", 1.0, "item4")
).toDF("feature", "value", "item")

where feature is a feature identifier, value is a corresponding value and item is object identifier and feature, item pair has only one corresponding value.

Cosine similarity is defined as:

cosine_similarity

where numerator can be computed as:

val numer = df.as("this").withColumnRenamed("item", "this")
  .join(df.as("other").withColumnRenamed("item", "other"), Seq("feature"))
  .where($"this" < $"other")
  .groupBy($"this", $"other")
  .agg(sum($"this.value" * $"other.value").alias("dot"))

and norms used in the denominator as:

import org.apache.spark.sql.functions.sqrt

val norms = df.groupBy($"item").agg(sqrt(sum($"value" * $"value")).alias("norm"))

// Combined together:

val cosine = ($"dot" / ($"this_norm.norm" * $"other_norm.norm")).as("cosine") 

val similarities = numer
 .join(norms.alias("this_norm").withColumnRenamed("item", "this"), Seq("this"))
 .join(norms.alias("other_norm").withColumnRenamed("item", "other"), Seq("other"))
 .select($"this", $"other", cosine)

with result representing non-zero entries of the upper triangular matrix ignoring diagonal (which is trivial):

+-----+-----+-------------------+
| this|other|             cosine|
+-----+-----+-------------------+
|item1|item4| 0.8164965809277259|
|item1|item2|0.40824829046386296|
|item2|item4| 0.4999999999999999|
+-----+-----+-------------------+

This should be equivalent to:

import org.apache.spark.sql.functions.array
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
import org.apache.spark.mllib.linalg.Vectors

val pivoted = df.groupBy("item").pivot("feature").sum()
  .na.fill(0.0)
  .orderBy("item")

val mat = new IndexedRowMatrix(pivoted
  .select(array(pivoted.columns.tail.map(col): _*))
  .rdd
  .zipWithIndex
  .map {
    case (row, idx) => 
      new IndexedRow(idx, Vectors.dense(row.getSeq[Double](0).toArray))
  })

mat.toCoordinateMatrix.transpose
  .toIndexedRowMatrix.columnSimilarities
  .toBlockMatrix.toLocalMatrix
0.0  0.408248290463863  0.0  0.816496580927726
0.0  0.0                0.0  0.4999999999999999
0.0  0.0                0.0  0.0
0.0  0.0                0.0  0.0

Regarding your code:

  • Execution is sequential because your code operates on local (collected) collection.
  • myComplexFunction cannot be further distributed because it is distributed data structures and contexts.
查看更多
登录 后发表回答