I want to convert text documents into feature vectors using tf-idf, and then train a naive bayes algorithm to classify them.
I can easily load my text files without the labels and use HashingTF() to convert it into a vector, and then use IDF() to weight the words according to how important they are. But if I do that I get rid of the labels and it seems to be impossible to recombine the label with the vector even though the order is the same.
On the other hand, I can call HashingTF() on each individual document and keep the labels, but then I can't call IDF() on it since it requires the whole corpus of documents (and the labels would get in the way).
The spark documentation for naive bayes only has one example where the points are already labeled and vectorized so that isn't much help.
I also had a look at this guide: http://help.mortardata.com/technologies/spark/train_a_machine_learning_model
but here he only applies the hashing function on each document without idf.
So my question is whether there is a way to not only vectorize but also weight the words using idf for the naive bayes classifier? The main problem seems to be sparks's insistence on only accepting rdds of labeledPoints as input to NaiveBayes.
def parseLine(line):
label = row[1] # the label is the 2nd element of each row
features = row[3] # the text is the 4th element of each row
features = tokenize(features)
features = hashingTF.transform(features)
return LabeledPoint(label, features)
labeledData = data1.map(parseLine)
Standard PySpark approach (split -> transform -> zip) seems to work just fine:
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import NaiveBayes
training_raw = sc.parallelize([
{"text": "foo foo foo bar bar protein", "label": 1.0},
{"text": "foo bar dna for bar", "label": 0.0},
{"text": "foo bar foo dna foo", "label": 0.0},
{"text": "bar foo protein foo ", "label": 1.0}])
# Split data into labels and features, transform
# preservesPartitioning is not really required
# since map without partitioner shouldn't trigger repartitiong
labels = training_raw.map(
lambda doc: doc["label"], # Standard Python dict access
preservesPartitioning=True # This is obsolete.
)
tf = HashingTF(numFeatures=100).transform( ## Use much larger number in practice
training_raw.map(lambda doc: doc["text"].split(),
preservesPartitioning=True))
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
# Combine using zip
training = labels.zip(tfidf).map(lambda x: LabeledPoint(x[0], x[1]))
# Train and check
model = NaiveBayes.train(training)
labels_and_preds = labels.zip(model.predict(tfidf)).map(
lambda x: {"actual": x[0], "predicted": float(x[1])})
To get some statistics you can use MulticlassMetrics
:
from pyspark.mllib.evaluation import MulticlassMetrics
from operator import itemgetter
metrics = MulticlassMetrics(
labels_and_preds.map(itemgetter("actual", "predicted")))
metrics.confusionMatrix().toArray()
## array([[ 2., 0.],
## [ 0., 2.]])
Related
- Handling continuous data in Spark NaiveBayes