I want to use Spark's mllib.recommendation
library to build a prototype recommender system. However, the format of the user data I have is something of the following format:
AB123XY45678
CD234WZ12345
EF345OOO1234
GH456XY98765
....
If I want to use the mllib.recommendation
library, according to the API of the Rating
class, the user ids have to be integers (also have to be contiguous?)
It looks like some kind of conversion between the real user ids and the numeric ones used by Spark must be done. But how should I do this?
Spark don't really require numeric id, it just needs to bee some unique value, but for implementation they picked Int.
You can do simple back and forth transformation for userId:
case class MyRating(userId: String, product: Int, rating: Double)
val data: RDD[MyRating] = ???
// Assign unique Long id for each userId
val userIdToInt: RDD[(String, Long)] =
data.map(_.userId).distinct().zipWithUniqueId()
// Reverse mapping from generated id to original
val reverseMapping: RDD[(Long, String)]
userIdToInt map { case (l, r) => (r, l) }
// Depends on data size, maybe too big to keep
// on single machine
val map: Map[String, Int] =
userIdToInt.collect().toMap.mapValues(_.toInt)
// Transform to MLLib rating
val rating: RDD[Rating] = data.map { r =>
Rating(userIdToInt.lookup(r.userId).head.toInt, r.product, r.rating)
// -- or
Rating(map(r.userId), r.product, r.rating)
}
// ... train model
// ... get back to MyRating userId from Int
val someUserId: String = reverseMapping.lookup(123).head
You can also try 'data.zipWithUniqueId()' but I'm not sure that in this case .toInt will be safe transformation even if dataset size is small.
You need to run StringIndexer across your userids to convert the string to unique integer index. They don't have to be continuous.
We use this for our item recommendation engine in https://www.aihello.com
df is (user:String, product,rating)
val stringindexer = new StringIndexer()
.setInputCol("user")
.setOutputCol("userNumber")
val modelc = stringindexer.fit(df)
val df = modelc.transform(df)
@Ganesh Krishnan is right, StringIndexer solve this problem.
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql import SQLContext
>>> spark = SQLContext(sc)
>>> df = spark.createDataFrame(
... [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
... ["id", "category"])
| id|category|
+---+--------+
| 0| a|
| 1| b|
| 2| c|
| 3| a|
| 4| a|
| 5| c|
+---+--------+
>>> stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
>>> model = stringIndexer.fit(df)
>>> indexed = model.transform(df)
>>> indexed.show()
+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
| 0| a| 0.0|
| 1| b| 2.0|
| 2| c| 1.0|
| 3| a| 0.0|
| 4| a| 0.0|
| 5| c| 1.0|
+---+--------+-------------+
>>> converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
>>> converted = converter.transform(indexed)
>>> converted.show()
+---+--------+-------------+----------------+
| id|category|categoryIndex|originalCategory|
+---+--------+-------------+----------------+
| 0| a| 0.0| a|
| 1| b| 2.0| b|
| 2| c| 1.0| c|
| 3| a| 0.0| a|
| 4| a| 0.0| a|
| 5| c| 1.0| c|
+---+--------+-------------+----------------+
>>> converted.select("id", "originalCategory").show()
+---+----------------+
| id|originalCategory|
+---+----------------+
| 0| a|
| 1| b|
| 2| c|
| 3| a|
| 4| a|
| 5| c|
+---+----------------+
The above solution might not always work as I discovered. Spark is not able to perform RDD transformations from within other RDD's. Error output:
org.apache.spark.SparkException: RDD transformations and actions can
only be enter code hereinvoked by the driver, not inside of other
transformations; for example, rdd1.map(x => rdd2.values.count() * x)
is invalid because the values transformation and count action cannot
be performed inside of the rdd1.map transformation. For more
information, see SPARK-5063.
As a solution you could join the userIdToInt RDD with the original data RDD to store the relation between userId and the uniqueId. Then later on you can join the results RDD with this RDD again.
// Create RDD with the unique id included
val dataWithUniqueUserId: RDD[(String, Int, Int, Double)] =
data.keyBy(_.userId).join(userIdToInt).map(r =>
(r._2._1.userId, r._2._2.toInt, r._2._1.productId, 1))