From DataFrame to RDD[LabeledPoint]

2019-04-04 01:44发布

I am trying to implement a document classifier using Apache Spark MLlib and I am having some problems representing the data. My code is the following:

import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.ml.feature.Tokenizer
import org.apache.spark.ml.feature.HashingTF
import org.apache.spark.ml.feature.IDF

val sql = new SQLContext(sc)

// Load raw data from a TSV file
val raw = sc.textFile("data.tsv").map(_.split("\t").toSeq)

// Convert the RDD to a dataframe
val schema = StructType(List(StructField("class", StringType), StructField("content", StringType)))
val dataframe = sql.createDataFrame(raw.map(row => Row(row(0), row(1))), schema)

// Tokenize
val tokenizer = new Tokenizer().setInputCol("content").setOutputCol("tokens")
val tokenized = tokenizer.transform(dataframe)

// TF-IDF
val htf = new HashingTF().setInputCol("tokens").setOutputCol("rawFeatures").setNumFeatures(500)
val tf = htf.transform(tokenized)
tf.cache
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(tf)
val tfidf = idfModel.transform(tf)

// Create labeled points
val labeled = tfidf.map(row => LabeledPoint(row.getDouble(0), row.get(4)))

I need to use dataframes to generate the tokens and create the TF-IDF features. The problem appears when I try to convert this dataframe to a RDD[LabeledPoint]. I map the dataframe rows, but the get method of Row return an Any type, not the type defined on the dataframe schema (Vector). Therefore, I cannot construct the RDD I need to train a ML model.

What is the best option to get a RDD[LabeledPoint] after calculating a TF-IDF?

2条回答
Root(大扎)
2楼-- · 2019-04-04 02:28

Casting the object worked for me.

Try:

// Create labeled points
val labeled = tfidf.map(row => LabeledPoint(row.getDouble(0), row(4).asInstanceOf[Vector]))
查看更多
欢心
3楼-- · 2019-04-04 02:35

You need to use getAs[T](i: Int): T

// Create labeled points
import org.apache.spark.mllib.linalg.{Vector, Vectors}
val labeled = tfidf.map(row => LabeledPoint(row.getDouble(0), row.getAs[Vector](4)))
查看更多
登录 后发表回答