I have a set of data based on which I want to create a classification model. Each row has the following form:
user1,class1,product1
user1,class1,product2
user1,class1,product5
user2,class1,product2
user2,class1,product5
user3,class2,product1
There are about 1M users, 2 classes, and 1M products. What I would like to do next is create the sparse vectors (something already supported by MLlib) BUT in order to apply that function I have to create the dense vectors (with the 0s), first. In other words, I have to binarize my data. What's the easiest (or most elegant) way of doing that?
Given that I am a newbie in regards to MLlib, may I ask you to provide a concrete example? I am using MLlib 1.2.
EDIT
I have ended up with the following piece of code but is turns out to be really slow... Any other ideas provided that I can only use MLlib 1.2?
val data = test11.map(x=> ((x(0) , x(1)) , x(2))).groupByKey().map(x=> (x._1 , x._2.toArray)).map{x=>
var lt : Array[Double] = new Array[Double](test12.size)
val id = x._1._1
val cl = x._1._2
val dt = x._2
var i = -1
test12.foreach{y => i += 1; lt(i) = if(dt contains y) 1.0 else 0.0}
val vs = Vectors.dense(lt)
(id , cl , vs)
}
You can use spark.ml's OneHotEncoder.
You first use:
OneHotEncoder.categories(rdd, categoricalFields)
Where categoricalField
is the sequence of indexes at which your RDD
contains categorical data. categories
, given a dataset and the index of columns which are categorical variables, returns a structure that, for each field, describes the values that are present for in the dataset. That map is meant to be used as input to the encode method:
OneHotEncoder.encode(rdd, categories)
Which returns your vectorized RDD[Array[T]]
.
If using built-in OneHotEncoder
is not an option and you have only a single variable implementing poor man's one-hot is more or less straightforward. First lets create an example data:
import org.apache.spark.mllib.linalg.{Vector, Vectors}
val rdd = sc.parallelize(List(
Array("user1", "class1", "product1"),
Array("user1", "class1", "product2"),
Array("user1", "class1", "product5"),
Array("user2", "class1", "product2"),
Array("user2", "class1", "product5"),
Array("user3", "class2", "product1")))
Next we have to create a mapping from a value to the index:
val prodMap = sc.broadcast(rdd.map(_(2)).distinct.zipWithIndex.collectAsMap)
and a simple encoding function:
def encodeProducts(products: Iterable[String]): Vector = {
Vectors.sparse(
prodMap.value.size,
products.map(product => (prodMap.value(product).toInt, 1.0)).toSeq
)
}
Finally we can apply it to the dataset:
rdd.map(x => ((x(0), x(1)), x(2))).groupByKey.mapValues(encodeProducts)
It is relatively easy to extend above to handle multiple variables.
Edit:
If number of products is to large to make broadcasting useful it should be possible to use join
instead. First we can create similar mapping from product to index but keep it as a RDD:
import org.apache.spark.HashPartitioner
val nPartitions = ???
val prodMapRDD = rdd
.map(_(2))
.distinct
.zipWithIndex
.partitionBy(new HashPartitioner(nPartitions))
.cache
val nProducts = prodMapRDD.count // Should be < Int.MaxValue
Next we reshape input RDD
to get PairRDD
indexed by product:
val pairs = rdd
.map(rec => (rec(2), (rec(0), rec(1))))
.partitionBy(new HashPartitioner(nPartitions))
Finally we can join
both
def indicesToVec(n: Int)(indices: Iterable[Long]): Vector = {
Vectors.sparse(n, indices.map(x => (x.toInt, 1.0)).toSeq)
}
pairs.join(prodMapRDD)
.values
.groupByKey
.mapValues(indicesToVec(nProducts.toInt))
Original question asks for the easiest way to specify categorical features from non-categorical.
In Spark ML, you can use VectorIndexer's setMaxCategories method, where you do not have to specify the fields - instead, it will understand as categorical those fields with lower or equal cardinality than a given number (in this case, 2).
val indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10)
Please see this reply for details.