Spark ALS predictAll returns empty

2019-01-12 04:28发布

问题:

I have the following Python test code (the arguments to ALS.train are defined elsewhere):

 r1 = (2, 1)
 r2 = (3, 1)
 test = sc.parallelize([r1, r2]) 
 model = ALS.train(ratings, rank, numIter, lmbda)
 predictions = model.predictAll(test)

 print test.take(1)     
 print predictions.count()
 print predictions

Which works, because it has a count of 1 against the predictions variable and outputs:

[(2, 1)]
1
ParallelCollectionRDD[2691] at parallelize at PythonRDD.scala:423

However, when I try and use an RDD I created myself using the following code, it doesn't appear to work anymore:

model = ALS.train(ratings, rank, numIter, lmbda)
validation_data = validation.map(lambda xs: tuple(int(x) for x in xs))
predictions = model.predictAll(validation_data)

print validation_data.take(1)
print predictions.count()
print validation_data

Which outputs:

[(61, 3864)]
0
PythonRDD[4018] at RDD at PythonRDD.scala:43

As you can see, predictAllcomes back empty when passed the mapped RDD. The values going in are both of the same format. The only noticeable difference that I can see is that the first example uses parallelize and produces a ParallelCollectionRDDwhereas the second example just uses a map which produces a PythonRDD. Does predictAll only work if passed a certain type of RDD? If so, is it possible to convert between RDD types? I'm not sure how to get this working.

回答1:

There are two basic conditions under which MatrixFactorizationMode.predictAll may return a RDD with lower number of items than the input:

  • user is missing in the training set.
  • product is missing in the training set.

You can easily reproduce this behavior and check that it is is not dependent on the way how RDD has been created. First lets use example data to build a model:

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

def parse(s):
    x, y, z  = s.split(",")
    return Rating(int(x), int(y), float(z))

ratings = (sc.textFile("data/mllib/als/test.data")
  .map(parse)
  .union(sc.parallelize([Rating(1, 5, 4.0)])))

model = ALS.train(ratings, 10, 10)

Next lets see which products and users are present in the training data:

set(ratings.map(lambda r: r.product).collect())
## {1, 2, 3, 4, 5}

set(ratings.map(lambda r: r.user).collect())
## {1, 2, 3, 4}

Now lets create test data and check predictions:

valid_test = sc.parallelize([(2, 5), (1, 4), (3, 5)])
valid_test
## ParallelCollectionRDD[434] at parallelize at PythonRDD.scala:423

model.predictAll(valid_test).count()
## 3

So far so good. Next lets map it using the same logic as in your code:

valid_test_ = valid_test.map(lambda xs: tuple(int(x) for x in xs))
valid_test_
## PythonRDD[497] at RDD at PythonRDD.scala:43

model.predictAll(valid_test_).count()
## 3

Still fine. Next lets create invalid data and repeat experiment:

invalid_test = sc.parallelize([
  (2, 6), # No product in the training data
  (6, 1)  # No user in the training data
])
invalid_test 
## ParallelCollectionRDD[500] at parallelize at PythonRDD.scala:423

model.predictAll(invalid_test).count()
## 0 

invalid_test_ = invalid_test.map(lambda xs: tuple(int(x) for x in xs))
model.predictAll(invalid_test_).count()
## 0

As expected there are no predictions for invalid input.

Finally you can confirm this is really the case by using ML model which is completely independent in training / prediction from Python code:

from pyspark.ml.recommendation import ALS as MLALS

model_ml = MLALS(rank=10, maxIter=10).fit(
    ratings.toDF(["user", "item", "rating"])
)
model_ml.transform((valid_test + invalid_test).toDF(["user", "item"])).show()

## +----+----+----------+
## |user|item|prediction|
## +----+----+----------+
## |   6|   1|       NaN|
## |   1|   4| 1.0184212|
## |   2|   5| 4.0041084|
## |   3|   5|0.40498763|
## |   2|   6|       NaN|
## +----+----+----------+

As you can see no corresponding user / item in the training data means no prediction.