I am building text classifier and am using spark countVectorizer to create feature vector.
Now to use this Vector with BIDGL library i need to convert all 0 in the feature vector to 1.
Here is my feature vector which is a sparse vector :
vectorizer_df.select('features').show(2)
+--------------------+
| features|
+--------------------+
|(1000,[4,6,11,13,...|
|(1000,[0,1,2,3,4,...|
+--------------------+
only showing top 2 rows
I am trying to update the value as below. First converting the sparse vector to dense vector
from pyspark.mllib.linalg import Vectors, VectorUDT
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.functions import udf
update_vector = udf(lambda vector: Vectors.dense(vector), VectorUDT())
df = vectorizer_df.withColumn('features',update_vector(vectorizer_df.features))
df.select('features').show(2)
+--------------------+
| features|
+--------------------+
|[0.0,0.0,0.0,0.0,...|
|[5571.0,4688.0,24...|
+--------------------+
only showing top 2 rows
Once i have the dense vector, i am trying to add 1 to all the elements
def add1(x):
return x+1
def array_for(x):
return np.array([add1(xi) for xi in x])
add_udf_one = udf(lambda z: array_for(z), VectorUDT())
df = df.select('features', add_udf_one('features').alias('feature_1'))
df.select('feature_1').show(2)
But now i get an TypeError as below:
TypeError: cannot serialize array([ ....]) of type <class 'numpy.ndarray'>
Full error as below
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-98-5aa5196824cf> in <module>
----> 1 df.select('feature_1').show(2)
/usr/local/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
348 """
349 if isinstance(truncate, bool) and truncate:
--> 350 print(self._jdf.showString(n, 20, vertical))
351 else:
352 print(self._jdf.showString(n, int(truncate), vertical))
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o1192.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 63.0 failed 1 times, most recent failure: Lost task 0.0 in stage 63.0 (TID 4886, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 324, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in dump_stream
for obj in iterator:
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 313, in _batched
for item in iterator:
File "<string>", line 1, in <lambda>
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 73, in <lambda>
return lambda *a: toInternal(f(*a))
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 682, in toInternal
return self._cachedSqlType().toInternal(self.serialize(obj))
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 169, in serialize
raise TypeError("cannot serialize %r of type %r" % (obj, type(obj)))
TypeError: cannot serialize array([ 1., 1., 1., 1., 2., 1., 326., 1., 1., 1., 1.,
2., 1., 3., 1., 1., 1., 1., 383., 1., 312., 1.,
1., 1., 1., 1., 1., 39., 1., 1., 1., 1., 1.,
180., 1., 1., 1., 167., 4., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 133., 1.,
1., 1., 123., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 96., 1., 7.,
7., 7., 7., 7., 7., 7., 1., 1., 13., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 4., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]) of type <class 'numpy.ndarray'>
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:66)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Any suggestions on how I can update the pyspark feature vector >
Thanks
You are almost there, but Spark doesn't support NumPy types, and
VectorUDT
wouldn't match one anyway.Instead
Usage: