Pyspark update the value in feature vector

2019-08-20 12:08发布

I am building text classifier and am using spark countVectorizer to create feature vector.

Now to use this Vector with BIDGL library i need to convert all 0 in the feature vector to 1.

Here is my feature vector which is a sparse vector :

vectorizer_df.select('features').show(2)
+--------------------+
|            features|
+--------------------+
|(1000,[4,6,11,13,...|
|(1000,[0,1,2,3,4,...|
+--------------------+
only showing top 2 rows

I am trying to update the value as below. First converting the sparse vector to dense vector

from pyspark.mllib.linalg import Vectors, VectorUDT
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.functions import udf

update_vector = udf(lambda vector: Vectors.dense(vector), VectorUDT())


df = vectorizer_df.withColumn('features',update_vector(vectorizer_df.features))

df.select('features').show(2)
+--------------------+
|            features|
+--------------------+
|[0.0,0.0,0.0,0.0,...|
|[5571.0,4688.0,24...|
+--------------------+
only showing top 2 rows

Once i have the dense vector, i am trying to add 1 to all the elements

def add1(x):
    return x+1
def array_for(x):
    return np.array([add1(xi) for xi in x])

add_udf_one = udf(lambda z: array_for(z), VectorUDT())

df = df.select('features', add_udf_one('features').alias('feature_1'))

df.select('feature_1').show(2)

But now i get an TypeError as below:

TypeError: cannot serialize array([  ....]) of type <class 'numpy.ndarray'>

Full error as below

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-98-5aa5196824cf> in <module>
----> 1 df.select('feature_1').show(2)

/usr/local/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    348         """
    349         if isinstance(truncate, bool) and truncate:
--> 350             print(self._jdf.showString(n, 20, vertical))
    351         else:
    352             print(self._jdf.showString(n, int(truncate), vertical))

/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o1192.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 63.0 failed 1 times, most recent failure: Lost task 0.0 in stage 63.0 (TID 4886, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 324, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in dump_stream
    for obj in iterator:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 313, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 73, in <lambda>
    return lambda *a: toInternal(f(*a))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 682, in toInternal
    return self._cachedSqlType().toInternal(self.serialize(obj))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 169, in serialize
    raise TypeError("cannot serialize %r of type %r" % (obj, type(obj)))
TypeError: cannot serialize array([  1.,   1.,   1.,   1.,   2.,   1., 326.,   1.,   1.,   1.,   1.,
         2.,   1.,   3.,   1.,   1.,   1.,   1., 383.,   1., 312.,   1.,
         1.,   1.,   1.,   1.,   1.,  39.,   1.,   1.,   1.,   1.,   1.,
       180.,   1.,   1.,   1., 167.,   4.,   1.,   1.,   1.,   1.,   1.,
         1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1., 133.,   1.,
         1.,   1., 123.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,
         1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,
         1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,  96.,   1.,   7.,
         7.,   7.,   7.,   7.,   7.,   7.,   1.,   1.,  13.,   1.,   1.,
         1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,
         1.,   4.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,
         1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,
         1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.,   1.]) of type <class 'numpy.ndarray'>

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:66)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Any suggestions on how I can update the pyspark feature vector >

Thanks

1条回答
劳资没心,怎么记你
2楼-- · 2019-08-20 12:32

You are almost there, but Spark doesn't support NumPy types, and VectorUDT wouldn't match one anyway.

Instead

import numpy as np
from pyspark.sql.functions import udf
from pyspark.ml.linalg import DenseVector, SparseVector, Vectors, VectorUDT

@udf(VectorUDT())
def zeros_to_ones(v):
    if v is None:
        return v
    # Sparse vector will become dense
    if isinstance(v, SparseVector):
        v = v.toArray()
        return DenseVector(np.where(v == 0, 1, v))
    if isinstance(v, DenseVector):
        return DenseVector(np.where(v.array == 0, 1, v.array))

Usage:

df = spark.createDataFrame(
    [(1, Vectors.dense([0, 1, 0, 3])), (2, Vectors.sparse(4, [0, 3], [0, 1]))], 
    ("id", "features")
)

df.withColumn("features_no_zeros", zeros_to_ones("features")).show(truncate=False)
+---+-------------------+-----------------+                                     
|id |features           |features_no_zeros|
+---+-------------------+-----------------+
|1  |[0.0,1.0,0.0,3.0]  |[1.0,1.0,1.0,3.0]|
|2  |(4,[0,3],[0.0,1.0])|[1.0,1.0,1.0,1.0]|
+---+-------------------+-----------------+
查看更多
登录 后发表回答