I have a DenseVector
RDD
like this
>>> frequencyDenseVectors.collect()
[DenseVector([1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0]), DenseVector([1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0])]
I want to convert this into a Dataframe
. I tried like this
>>> spark.createDataFrame(frequencyDenseVectors, ['rawfeatures']).collect()
It gives an error like this
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 520, in createDataFrame
rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 360, in _createFromRDD
struct = self._inferSchema(rdd, samplingRatio)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 340, in _inferSchema
schema = _infer_schema(first)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 991, in _infer_schema
fields = [StructField(k, _infer_type(v), True) for k, v in items]
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 968, in _infer_type
raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <type 'numpy.ndarray'>
old Solution
frequencyVectors.map(lambda vector: DenseVector(vector.toArray()))
Edit 1 - Code Reproducible
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import split
from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.mllib.linalg import SparseVector, DenseVector
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
sc.setLogLevel('ERROR')
sentenceData = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(0, "I wish Java could use case classes"),
(1, "Logistic regression models are neat")
], ["label", "sentence"])
sentenceData = sentenceData.withColumn("sentence", split("sentence", "\s+"))
sentenceData.show()
vectorizer = CountVectorizer(inputCol="sentence", outputCol="rawfeatures").fit(sentenceData)
countVectors = vectorizer.transform(sentenceData).select("label", "rawfeatures")
idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(countVectors)
tfidf = idfModel.transform(countVectors).select("label", "features")
frequencyDenseVectors = tfidf.rdd.map(lambda vector: [vector[0],DenseVector(vector[1].toArray())])
frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])