Spark: StringIndexer on sentences

2019-03-02 04:47发布

I am trying to do something StringIndexer on a column of sentences, i.e. transforming list of words to list of integers.

For example:

input dataset:

  (1, ["I", "like", "Spark"])
  (2, ["I", "hate", "Spark"])

I expected the output after StringIndexer to be like:

  (1, [0, 2, 1])
  (2, [0, 3, 1])

Ideally, I would like to make such transformation as part of Pipeline, so that I can chain couple transformer together and serialize for online serving.

Is this something Spark support natively?

Thank you!

1条回答
啃猪蹄的小仙女
2楼-- · 2019-03-02 05:02

Standard Transformers used for converting text to features are CountVectorizer

CountVectorizer and CountVectorizerModel aim to help convert a collection of text documents to vectors of token counts.

or HashingTF:

Maps a sequence of terms to their term frequencies using the hashing trick. Currently we use Austin Appleby's MurmurHash 3 algorithm (MurmurHash3_x86_32) to calculate the hash code value for the term object. Since a simple modulo is used to transform the hash function to a column index, it is advisable to use a power of two as the numFeatures parameter; otherwise the features will not be mapped evenly to the columns.

Both have binary option which can used to switch from count to binary vector.

There is no builtin Transfomer that can give exact result you want (it wouldn't be useful for ML algorithms) buy you can explode apply StringIndexer, and collect_list / collect_set:

import org.apache.spark.ml.feature._
import org.apache.spark.ml.Pipeline


val df = Seq(
  (1, Array("I", "like", "Spark")), (2, Array("I", "hate", "Spark"))
).toDF("id", "words")

val pipeline = new Pipeline().setStages(Array(
  new SQLTransformer()
    .setStatement("SELECT id, explode(words) as word FROM __THIS__"),
  new StringIndexer().setInputCol("word").setOutputCol("index"),
  new SQLTransformer()
    .setStatement("""SELECT id, COLLECT_SET(index) AS values 
                     FROM __THIS__ GROUP BY id""")
))

pipeline.fit(df).transform(df).show

// +---+---------------+                      
// | id|         values|
// +---+---------------+
// |  1|[0.0, 1.0, 3.0]|
// |  2|[2.0, 0.0, 1.0]|
// +---+---------------+

With CountVectorizer and udf:

import org.apache.spark.ml.linalg._


spark.udf.register("indices", (v: Vector) => v.toSparse.indices)

val pipeline = new Pipeline().setStages(Array(
  new CountVectorizer().setInputCol("words").setOutputCol("vector"),
  new SQLTransformer()
    .setStatement("SELECT *, indices(vector) FROM __THIS__")
))

pipeline.fit(df).transform(df).show

// +---+----------------+--------------------+-------------------+
// | id|           words|              vector|UDF:indices(vector)|
// +---+----------------+--------------------+-------------------+
// |  1|[I, like, Spark]|(4,[0,1,3],[1.0,1...|          [0, 1, 3]|
// |  2|[I, hate, Spark]|(4,[0,1,2],[1.0,1...|          [0, 1, 2]|
// +---+----------------+--------------------+-------------------+
查看更多
登录 后发表回答