How to transform a categorical variable in Spark i

2019-02-03 17:00发布

问题:

I'm trying to perform a logistic regression (LogisticRegressionWithLBFGS) with Spark MLlib (with Scala) on a dataset which contains categorical variables. I discover Spark was not able to work with that kind of variable.

In R there is a simple way to deal with that kind of problem : I transform the variable in factor (categories), so R creates a set of columns coded as {0,1} indicator variables.

How can I perform this with Spark?

回答1:

Using VectorIndexer, you may tell the indexer the number of different values (cardinality) that a field may have in order to be considered categorical with the setMaxCategories() method.

val indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10)

From Scaladocs:

Class for indexing categorical feature columns in a dataset of Vector.

This has 2 usage modes:

Automatically identify categorical features (default behavior)

This helps process a dataset of unknown vectors into a dataset with some continuous features and some categorical features. The choice between continuous and categorical is based upon a maxCategories parameter.

Set maxCategories to the maximum number of categorical any categorical feature should have.

E.g.: Feature 0 has unique values {-1.0, 0.0}, and feature 1 values {1.0, 3.0, 5.0}. If maxCategories = 2, then feature 0 will be declared categorical and use indices {0, 1}, and feature 1 will be declared continuous.

I find this a convenient (though coarse-grained) way to extract the categorical values, but beware if in any case you have a field with lower arity that you want to be continuous (e.g. age in college students vs origin country or US-state).



回答2:

If I understood correctly you do not want to convert 1 categorical column in several dummy columns. You want spark to understand that the column is categorical and not numerical.

I think it depends on the algorithm you want to use right now. For example random Forest and GBT have both categoricalFeaturesInfo as a parameter check it here:

https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.tree.RandomForest$

so for example:

categoricalFeaturesInfo = Map[Int, Int]((1,2),(2,5))

is actually saying that second column of your features (index starts in 0, so 1 is second column) is a categorical one with 2 levels, and 3rd is also a categorical feature with 5 levels. You can specify these parameters when you train your randomForest or GBT.

You need to make sure your levels are mapped to 0,1,2... so if you have something like ("good","medium","bad") map it into (0,1,2).

Now in your case you want to use LogisticRegressionWithLBFGS. In this case my suggestion is to actually transform categorical columns into dummy columns. For example a single column with 3 levels ("good","medium","bad") into 3 columns with 1/0 depending on which one hits. I do not have an example to work with so here is a sample code in scala that should work:

val dummygen = (data : DataFrame, col:Array[String]) => {
    var temp = data
    for(i <- 0 until col.length) {
      val N = data.select(col(i)).distinct.count.toInt
      for (j<- 0 until N)
      temp = temp.withColumn(col(i) + "_" + j.toString, callUDF(index(j), DoubleType, data(col(i))))
    }
  temp
  }
  val index = (value:Double) => {(a:Double) => {
    if (value==a) {
      1
    } else{
      0
    }
  }}

That you can call it like:

val results = dummygen(data, Array("CategoricalColumn1","CategoricalColumn2"))

Here I do it for a list of Categorical Columns (just in case you have more than 1 in your features list). First "for loop" goes through each categorical column, second "for loop" goes through each level in the column and creates a number of columns equals to the number of levels for each column.

Important!!! that it assumes that you first mapped them to 0,1,2...

You can then run your LogisticRegressionWithLBFGS using this new features set. This approach also helps with SVM.



回答3:

A VectorIndexer is coming in Spark 1.4 which might help you with this kind of feature transformation: http://people.apache.org/~pwendell/spark-1.4.0-rc1-docs/api/scala/index.html#org.apache.spark.ml.feature.VectorIndexer

However it looks like this will only be available in spark.ml rather than mllib

https://issues.apache.org/jira/browse/SPARK-4081



回答4:

If the categories can fit in the driver memory, here is my suggestion:

import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.functions._
import org.apache.spark.sql._


val df = Seq((0, "a"),(1, "b"),(2, "c"),(3, "a"),(4, "a"),(5, "c"),(6,"c"),(7,"d"),(8,"b"))
            .toDF("id", "category")
val indexer = new StringIndexer()
                   .setInputCol("category")
                   .setOutputCol("categoryIndex")
                   .fit(df)

val indexed = indexer.transform(df)

val categoriesIndecies = indexed.select("category","categoryIndex").distinct
val categoriesMap: scala.collection.Map[String,Double] = categoriesIndecies.map(x=>(x(0).toString,x(1).toString.toDouble)).collectAsMap()

def getCategoryIndex(catMap: scala.collection.Map[String,Double], expectedValue: Double) = udf((columnValue: String) =>
if (catMap(columnValue) == expectedValue) 1 else 0)


val newDf:DataFrame =categoriesMap.keySet.toSeq.foldLeft[DataFrame](indexed)(
     (acc,c) => 
          acc.withColumn(c,getCategoryIndex(categoriesMap,categoriesMap(c))($"category"))
     )

newDf.show


+---+--------+-------------+---+---+---+---+
| id|category|categoryIndex|  b|  d|  a|  c|
+---+--------+-------------+---+---+---+---+
|  0|       a|          0.0|  0|  0|  1|  0|
|  1|       b|          2.0|  1|  0|  0|  0|
|  2|       c|          1.0|  0|  0|  0|  1|
|  3|       a|          0.0|  0|  0|  1|  0|
|  4|       a|          0.0|  0|  0|  1|  0|
|  5|       c|          1.0|  0|  0|  0|  1|
|  6|       c|          1.0|  0|  0|  0|  1|
|  7|       d|          3.0|  0|  1|  0|  0|
|  8|       b|          2.0|  1|  0|  0|  0|
+---+--------+-------------+---+---+---+---+