I have the following Python test code (the arguments to ALS.train
are defined elsewhere):
r1 = (2, 1)
r2 = (3, 1)
test = sc.parallelize([r1, r2])
model = ALS.train(ratings, rank, numIter, lmbda)
predictions = model.predictAll(test)
print test.take(1)
print predictions.count()
print predictions
Which works, because it has a count of 1 against the predictions variable and outputs:
[(2, 1)]
1
ParallelCollectionRDD[2691] at parallelize at PythonRDD.scala:423
However, when I try and use an RDD
I created myself using the following code, it doesn't appear to work anymore:
model = ALS.train(ratings, rank, numIter, lmbda)
validation_data = validation.map(lambda xs: tuple(int(x) for x in xs))
predictions = model.predictAll(validation_data)
print validation_data.take(1)
print predictions.count()
print validation_data
Which outputs:
[(61, 3864)]
0
PythonRDD[4018] at RDD at PythonRDD.scala:43
As you can see, predictAll
comes back empty when passed the mapped RDD
. The values going in are both of the same format. The only noticeable difference that I can see is that the first example uses parallelize and produces a ParallelCollectionRDD
whereas the second example just uses a map which produces a PythonRDD
. Does predictAll
only work if passed a certain type of RDD
? If so, is it possible to convert between RDD
types? I'm not sure how to get this working.
There are two basic conditions under which MatrixFactorizationMode.predictAll
may return a RDD with lower number of items than the input:
- user is missing in the training set.
- product is missing in the training set.
You can easily reproduce this behavior and check that it is is not dependent on the way how RDD has been created. First lets use example data to build a model:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
def parse(s):
x, y, z = s.split(",")
return Rating(int(x), int(y), float(z))
ratings = (sc.textFile("data/mllib/als/test.data")
.map(parse)
.union(sc.parallelize([Rating(1, 5, 4.0)])))
model = ALS.train(ratings, 10, 10)
Next lets see which products and users are present in the training data:
set(ratings.map(lambda r: r.product).collect())
## {1, 2, 3, 4, 5}
set(ratings.map(lambda r: r.user).collect())
## {1, 2, 3, 4}
Now lets create test data and check predictions:
valid_test = sc.parallelize([(2, 5), (1, 4), (3, 5)])
valid_test
## ParallelCollectionRDD[434] at parallelize at PythonRDD.scala:423
model.predictAll(valid_test).count()
## 3
So far so good. Next lets map it using the same logic as in your code:
valid_test_ = valid_test.map(lambda xs: tuple(int(x) for x in xs))
valid_test_
## PythonRDD[497] at RDD at PythonRDD.scala:43
model.predictAll(valid_test_).count()
## 3
Still fine. Next lets create invalid data and repeat experiment:
invalid_test = sc.parallelize([
(2, 6), # No product in the training data
(6, 1) # No user in the training data
])
invalid_test
## ParallelCollectionRDD[500] at parallelize at PythonRDD.scala:423
model.predictAll(invalid_test).count()
## 0
invalid_test_ = invalid_test.map(lambda xs: tuple(int(x) for x in xs))
model.predictAll(invalid_test_).count()
## 0
As expected there are no predictions for invalid input.
Finally you can confirm this is really the case by using ML model which is completely independent in training / prediction from Python code:
from pyspark.ml.recommendation import ALS as MLALS
model_ml = MLALS(rank=10, maxIter=10).fit(
ratings.toDF(["user", "item", "rating"])
)
model_ml.transform((valid_test + invalid_test).toDF(["user", "item"])).show()
## +----+----+----------+
## |user|item|prediction|
## +----+----+----------+
## | 6| 1| NaN|
## | 1| 4| 1.0184212|
## | 2| 5| 4.0041084|
## | 3| 5|0.40498763|
## | 2| 6| NaN|
## +----+----+----------+
As you can see no corresponding user / item in the training data means no prediction.