Import all the libraries you need
from pyspark.mllib.classification import LogisticRegressionWithSGD, LogisticRegressionModel
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint
import re
Load your data into an RDD
msgs = [("I love Star Wars but I can't watch it today", 1.0),
("I don't love Star Wars and people want to watch it today", 0.0),
("I dislike not being able to watch Star Wars", 1.0),
("People who love Star Wars are my friends", 1.0),
("I preffer to watch Star Wars on Netflix", 0.0),
("George Lucas shouldn't have sold the franchise", 1.0),
("Disney makes better movies than everyone else", 0.0)]
rdd = sc.parallelize(msgs)
Tokenize your data (if you use ML it might be easier) and
rdd = rdd.map(lambda (text, label): ([w.lower() for w in re.split(" +", text)], label))
Remove all unnecessary words (widely known as stop-words), and symbols e.g. ,.&
commons = ["and", "but", "to"]
rdd = rdd.map(lambda (tokens, label): (filter(lambda token: token not in commons, tokens), label))
Create a dictionary with all distinct
words in all your dataset, it sounds huge but they are not so many as you would expect, and I bet they will fit in your master node (however there are other ways to approach this, but for simplicity I will keep this way).
# finds different words
words = rdd.flatMap(lambda (tokens, label): tokens).distinct().collect()
diffwords = len(words)
Convert your features
into a DenseVector or SparseVector, I would obviously recommend the second way because normally a SparseVector
requires less space to be represented, however it depends on the data. Note, there are better alternatives like hashing
, but I am trying to keep loyal to my verbose approach. After that transform the tuple
into a LabeledPoint
def sparsify(length, tokens):
indices = [words.index(t) for t in set(tokens)]
quantities = [tokens.count(words[i]) for i in indices]
return SparseVector(length, [(indices[i], quantities[i]) for i in xrange(len(indices))])
rdd = rdd.map(lambda (tokens, label): LabeledPoint(label, sparsify(diffwords, tokens)))
Fit your favorite model, in this case I used LogisticRegressionWithSGD due ulterior motives.
lrm = LogisticRegressionWithSGD.train(rdd)
Save your model.
lrm.save(sc, "mylovelymodel.model")
Load your LogisticRegressionModel in another application.
lrm = LogisticRegressionModel.load(sc, "mylovelymodel.model")
Predict the categories.
lrm.predict(SparseVector(37,[2,4,5,13,15,19,23,26,27,29],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))
# outputs 0