error in labelled point object pyspark

2019-07-28 23:09发布

问题:

I am writing a function

  1. which takes a RDD as input
  2. splits the comma separated values
  3. then convert each row into labelled point object
  4. finally fetch the output as a dataframe

    code: 
    
    def parse_points(raw_rdd):
    
        cleaned_rdd = raw_rdd.map(lambda line: line.split(","))
        new_df = cleaned_rdd.map(lambda line:LabeledPoint(line[0],[line[1:]])).toDF()
        return new_df
    
    
    output = parse_points(input_rdd)
    

upto this if I run the code, there is no error it is working fine.

But on adding the line,

 output.take(5)

I am getting the error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task   0 in stage 129.0 failed 1 times, most recent failure: Lost task 0.0 in s    stage 129.0 (TID 152, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):

Py4JJavaError       Traceback (most recent call last)
<ipython-input-100-a68c448b64b0> in <module>()
 20 
 21 output = parse_points(raw_rdd)
 ---> 22 print output.show()

Please suggest me what is the mistake.

回答1:

The reason you had no errors until you execute the action:

 output.take(5)

Is due to the nature of spark, which is lazy. i.e. nothing was execute in spark until you execute the action "take(5)"

You have a few issues in your code, and I think that you are failing due to extra "[" and "]" in [line[1:]]

So you need to remove extra "[" and "]" in [line[1:]] (and keep only the line[1:])

Another issue which you might need to solve is the lack of dataframe schema.

i.e. replace "toDF()" with "toDF(["features","label"])" This will give the dataframe a schema.



回答2:

Try:

>>> raw_rdd.map(lambda line: line.split(",")) \
...     .map(lambda line:LabeledPoint(line[0], [float(x) for x in line[1:]])