How to use mllib.recommendation if the user ids ar

2020-02-09 09:24发布

问题:

I want to use Spark's mllib.recommendation library to build a prototype recommender system. However, the format of the user data I have is something of the following format:

AB123XY45678
CD234WZ12345
EF345OOO1234
GH456XY98765
....

If I want to use the mllib.recommendation library, according to the API of the Rating class, the user ids have to be integers (also have to be contiguous?)

It looks like some kind of conversion between the real user ids and the numeric ones used by Spark must be done. But how should I do this?

回答1:

Spark don't really require numeric id, it just needs to bee some unique value, but for implementation they picked Int.

You can do simple back and forth transformation for userId:

  case class MyRating(userId: String, product: Int, rating: Double)

  val data: RDD[MyRating] = ???

  // Assign unique Long id for each userId
  val userIdToInt: RDD[(String, Long)] = 
    data.map(_.userId).distinct().zipWithUniqueId()

  // Reverse mapping from generated id to original
  val reverseMapping: RDD[(Long, String)]
    userIdToInt map { case (l, r) => (r, l) }

  // Depends on data size, maybe too big to keep
  // on single machine
  val map: Map[String, Int] = 
    userIdToInt.collect().toMap.mapValues(_.toInt)

  // Transform to MLLib rating
  val rating: RDD[Rating] = data.map { r =>
    Rating(userIdToInt.lookup(r.userId).head.toInt, r.product, r.rating)
    // -- or
    Rating(map(r.userId), r.product, r.rating)
  }

  // ... train model

  // ... get back to MyRating userId from Int

  val someUserId: String = reverseMapping.lookup(123).head

You can also try 'data.zipWithUniqueId()' but I'm not sure that in this case .toInt will be safe transformation even if dataset size is small.



回答2:

You need to run StringIndexer across your userids to convert the string to unique integer index. They don't have to be continuous.

We use this for our item recommendation engine in https://www.aihello.com

df is (user:String, product,rating)

  val stringindexer = new StringIndexer()
      .setInputCol("user")
      .setOutputCol("userNumber")
  val modelc = stringindexer.fit(df)
  val  df = modelc.transform(df)


回答3:

@Ganesh Krishnan is right, StringIndexer solve this problem.

from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql import SQLContext
>>> spark = SQLContext(sc)                                                                             
>>> df = spark.createDataFrame(
...     [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
...     ["id", "category"])

| id|category|
+---+--------+
|  0|       a|
|  1|       b|
|  2|       c|
|  3|       a|
|  4|       a|
|  5|       c|
+---+--------+
>>> stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
>>> model = stringIndexer.fit(df)
>>> indexed = model.transform(df)
>>> indexed.show()
+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+

>>> converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
>>> converted = converter.transform(indexed)
>>> converted.show()
+---+--------+-------------+----------------+
| id|category|categoryIndex|originalCategory|
+---+--------+-------------+----------------+
|  0|       a|          0.0|               a|
|  1|       b|          2.0|               b|
|  2|       c|          1.0|               c|
|  3|       a|          0.0|               a|
|  4|       a|          0.0|               a|
|  5|       c|          1.0|               c|
+---+--------+-------------+----------------+

>>> converted.select("id", "originalCategory").show()
+---+----------------+
| id|originalCategory|
+---+----------------+
|  0|               a|
|  1|               b|
|  2|               c|
|  3|               a|
|  4|               a|
|  5|               c|
+---+----------------+


回答4:

The above solution might not always work as I discovered. Spark is not able to perform RDD transformations from within other RDD's. Error output:

org.apache.spark.SparkException: RDD transformations and actions can only be enter code hereinvoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

As a solution you could join the userIdToInt RDD with the original data RDD to store the relation between userId and the uniqueId. Then later on you can join the results RDD with this RDD again.

// Create RDD with the unique id included
val dataWithUniqueUserId: RDD[(String, Int, Int, Double)] = 
    data.keyBy(_.userId).join(userIdToInt).map(r => 
        (r._2._1.userId, r._2._2.toInt, r._2._1.productId, 1))