What .map()
function in python do I use to create a set of labeledPoints
from a spark dataframe? What is the notation if The label/outcome is not the first column but I can refer to its column name, 'status'?
I create the Python dataframe with this .map() function:
def parsePoint(line):
listmp = list(line.split('\t'))
dataframe = pd.DataFrame(pd.get_dummies(listmp[1:]).sum()).transpose()
dataframe.insert(0, 'status', dataframe['accepted'])
if 'NULL' in dataframe.columns:
dataframe = dataframe.drop('NULL', axis=1)
if '' in dataframe.columns:
dataframe = dataframe.drop('', axis=1)
if 'rejected' in dataframe.columns:
dataframe = dataframe.drop('rejected', axis=1)
if 'accepted' in dataframe.columns:
dataframe = dataframe.drop('accepted', axis=1)
return dataframe
I convert it to a Spark dataframe after the reduce function has recombined all the Pandas dataframes.
parsedData=sqlContext.createDataFrame(parsedData)
But now how do I create labledPoints
from this in Python? I assume it may be another .map()
function?
If you already have numerical features and which require no additional transformations you can use VectorAssembler
to combine columns containing independent variables:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=["your", "independent", "variables"],
outputCol="features")
transformed = assembler.transform(parsedData)
Next you can simply map:
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import col
(transformed.select(col("outcome_column").alias("label"), col("features"))
.rdd
.map(lambda row: LabeledPoint(row.label, row.features)))
As of Spark 2.0 ml
and mllib
API are no longer compatible and the latter one is going towards deprecation and removal. If you still need this you'll have to convert ml.Vectors
to mllib.Vectors
.
from pyspark.mllib import linalg as mllib_linalg
from pyspark.ml import linalg as ml_linalg
def as_old(v):
if isinstance(v, ml_linalg.SparseVector):
return mllib_linalg.SparseVector(v.size, v.indices, v.values)
if isinstance(v, ml_linalg.DenseVector):
return mllib_linalg.DenseVector(v.values)
raise ValueError("Unsupported type {0}".format(type(v)))
and map:
lambda row: LabeledPoint(row.label, as_old(row.features)))