Create labeledPoints from Spark DataFrame in Pytho

2019-01-17 13:13发布

问题:

What .map() function in python do I use to create a set of labeledPoints from a spark dataframe? What is the notation if The label/outcome is not the first column but I can refer to its column name, 'status'?

I create the Python dataframe with this .map() function:

def parsePoint(line):
    listmp = list(line.split('\t'))
    dataframe = pd.DataFrame(pd.get_dummies(listmp[1:]).sum()).transpose()
    dataframe.insert(0, 'status', dataframe['accepted'])
    if 'NULL' in dataframe.columns:
        dataframe = dataframe.drop('NULL', axis=1)  
    if '' in dataframe.columns:
        dataframe = dataframe.drop('', axis=1)  
    if 'rejected' in dataframe.columns:
        dataframe = dataframe.drop('rejected', axis=1)  
    if 'accepted' in dataframe.columns:
        dataframe = dataframe.drop('accepted', axis=1)  
    return dataframe 

I convert it to a Spark dataframe after the reduce function has recombined all the Pandas dataframes.

parsedData=sqlContext.createDataFrame(parsedData)

But now how do I create labledPoints from this in Python? I assume it may be another .map() function?

回答1:

If you already have numerical features and which require no additional transformations you can use VectorAssembler to combine columns containing independent variables:

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["your", "independent", "variables"],
    outputCol="features")

transformed = assembler.transform(parsedData)

Next you can simply map:

from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import col

(transformed.select(col("outcome_column").alias("label"), col("features"))
  .rdd
  .map(lambda row: LabeledPoint(row.label, row.features)))

As of Spark 2.0 ml and mllib API are no longer compatible and the latter one is going towards deprecation and removal. If you still need this you'll have to convert ml.Vectors to mllib.Vectors.

from pyspark.mllib import linalg as mllib_linalg
from pyspark.ml import linalg as ml_linalg

def as_old(v):
    if isinstance(v, ml_linalg.SparseVector):
        return mllib_linalg.SparseVector(v.size, v.indices, v.values)
    if isinstance(v, ml_linalg.DenseVector):
        return mllib_linalg.DenseVector(v.values)
    raise ValueError("Unsupported type {0}".format(type(v)))

and map:

lambda row: LabeledPoint(row.label, as_old(row.features)))