How to encode categorical features in Apache Spark

2019-01-23 09:44发布

问题:

I have a set of data based on which I want to create a classification model. Each row has the following form:

user1,class1,product1
user1,class1,product2
user1,class1,product5
user2,class1,product2
user2,class1,product5
user3,class2,product1

There are about 1M users, 2 classes, and 1M products. What I would like to do next is create the sparse vectors (something already supported by MLlib) BUT in order to apply that function I have to create the dense vectors (with the 0s), first. In other words, I have to binarize my data. What's the easiest (or most elegant) way of doing that?

Given that I am a newbie in regards to MLlib, may I ask you to provide a concrete example? I am using MLlib 1.2.

EDIT

I have ended up with the following piece of code but is turns out to be really slow... Any other ideas provided that I can only use MLlib 1.2?

val data = test11.map(x=> ((x(0) , x(1)) , x(2))).groupByKey().map(x=> (x._1 , x._2.toArray)).map{x=>
  var lt : Array[Double] = new Array[Double](test12.size)
  val id = x._1._1
  val cl = x._1._2
  val dt = x._2
  var i = -1
  test12.foreach{y => i += 1; lt(i) = if(dt contains y) 1.0 else 0.0}
  val vs = Vectors.dense(lt)
  (id , cl , vs)
}

回答1:

You can use spark.ml's OneHotEncoder.

You first use:

OneHotEncoder.categories(rdd, categoricalFields)

Where categoricalField is the sequence of indexes at which your RDD contains categorical data. categories, given a dataset and the index of columns which are categorical variables, returns a structure that, for each field, describes the values that are present for in the dataset. That map is meant to be used as input to the encode method:

OneHotEncoder.encode(rdd, categories)

Which returns your vectorized RDD[Array[T]].



回答2:

If using built-in OneHotEncoder is not an option and you have only a single variable implementing poor man's one-hot is more or less straightforward. First lets create an example data:

import org.apache.spark.mllib.linalg.{Vector, Vectors}

val rdd = sc.parallelize(List(
    Array("user1", "class1", "product1"),
    Array("user1", "class1", "product2"),
    Array("user1", "class1", "product5"),
    Array("user2", "class1", "product2"),
    Array("user2", "class1", "product5"),
    Array("user3", "class2", "product1")))

Next we have to create a mapping from a value to the index:

val prodMap = sc.broadcast(rdd.map(_(2)).distinct.zipWithIndex.collectAsMap)

and a simple encoding function:

def encodeProducts(products: Iterable[String]): Vector =  {
    Vectors.sparse(
        prodMap.value.size,
        products.map(product => (prodMap.value(product).toInt, 1.0)).toSeq
    )
}

Finally we can apply it to the dataset:

rdd.map(x => ((x(0), x(1)), x(2))).groupByKey.mapValues(encodeProducts)

It is relatively easy to extend above to handle multiple variables.

Edit:

If number of products is to large to make broadcasting useful it should be possible to use join instead. First we can create similar mapping from product to index but keep it as a RDD:

import org.apache.spark.HashPartitioner

val nPartitions = ???

val prodMapRDD = rdd
     .map(_(2))
     .distinct
     .zipWithIndex
     .partitionBy(new HashPartitioner(nPartitions))
     .cache

val nProducts = prodMapRDD.count // Should be < Int.MaxValue

Next we reshape input RDD to get PairRDD indexed by product:

val pairs = rdd
    .map(rec => (rec(2), (rec(0), rec(1))))
    .partitionBy(new HashPartitioner(nPartitions))

Finally we can join both

def indicesToVec(n: Int)(indices: Iterable[Long]): Vector = {
     Vectors.sparse(n, indices.map(x => (x.toInt, 1.0)).toSeq)
}

pairs.join(prodMapRDD)
   .values
   .groupByKey
   .mapValues(indicesToVec(nProducts.toInt))


回答3:

Original question asks for the easiest way to specify categorical features from non-categorical.

In Spark ML, you can use VectorIndexer's setMaxCategories method, where you do not have to specify the fields - instead, it will understand as categorical those fields with lower or equal cardinality than a given number (in this case, 2).

val indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10)

Please see this reply for details.