Training a Machine Learning model on selected part

2019-05-31 07:53发布

问题:

I need to run a Random Forest on a dataset. The dataset is in a DataFrame organised as follows:

training_set_all = sc.parallelize([
    ['u1', 1, 0.9, 0.5, 0.0],
    ['u1', 0, 0.5, 0.1, 0.0],
    ['u2', 1, 0.3, 0.3, 0.8],
    ['u3', 1, 0.2, 0.2, 0.6],
    ['u2', 0, 0.0, 0.1, 0.4],
    ...
]).toDF(('status', 'user', 'product', 'f1', 'f2', 'f3'))

Basically there is a user, the class - target variable - (1 or 0) and then three numerical float features. In practice every user has its own training set and it is all included in a single DataFrame.

I need to train the mode on a user by user basis, so that from this training_set_all I select the part related to user u1, train the model, then go to u2 and do the same and so on for all users.

I could potentially solve by running a for loop on the users, filter the training set for those rows belonging to user and then train on them. But I'd gladly avoid for loops as they don't seem a good element in a Spark programming model.

The model requires a LabeledPoint containing (class, feature) and it requires 5 arguments, like the number of trees.

So, I was trying a map-reduceByKey approach as in (mind that I need to pass to LabeledPoint)

training_set_all \
    .map(lambda row: (row[0], LabeledPoint(row[1], [row[2], row[3], row[4]]))) \
    .reduceByKey(RandomForest.trainClassifier)

This clearly fails because I am not feeding the classifier with all the required parameters.

So in order to pass them I tried this:

def train():
    return RandomForest.trainClassifier(data, 2, {}, 3)

stuff = training_set_all \
    .map(lambda row: (row[0], LabeledPoint(row[1], [row[3], row[4], row[5]]))) \
    .reduceByKey(train)

This fails with error

Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

which is very vocal about what's wrong.

How would I apply the model selectively on that part of the training set pertaining to user, and without using a for loop?