ValueError: could not convert string to float

2019-09-07 04:32发布

问题:

I have a text file which contains some data. The data is as follows

join2_train = sc.textFile('join2_train.csv',4)
join2_train.take(3)   

 [u'21.9059,TA-00002,S-0066,7/7/2013,0,0,Yes,1,SP-0019,6.35,0.71,137,8,19.05,N,N,N,N,EF-008,EF-008,0,0,0',
     u'12.3412,TA-00002,S-0066,7/7/2013,0,0,Yes,2,SP-0019,6.35,0.71,137,8,19.05,N,N,N,N,EF-008,EF-008,0,0,0',
     u'6.60183,TA-00002,S-0066,7/7/2013,0,0,Yes,5,SP-0019,6.35,0.71,137,8,19.05,N,N,N,N,EF-008,EF-008,0,0,0']

Now I am trying to parse this string into a function which splits each of the lines of text and convert into a LabeledPoint. I have also included a line for converting the string elements to float

The function is as follows

from pyspark.mllib.regression import LabeledPoint
import numpy as np

def parsePoint(line):


    """Converts a comma separated unicode string into a `LabeledPoint`.

    Args:
        line (unicode): Comma separated unicode string where the first element is the label and the
            remaining elements are features.

    Returns:
        LabeledPoint: The line is converted into a `LabeledPoint`, which consists of a label and
            features.
    """
    values = line.split(',')
    value1 = [map(float,i) for i in values]
    return LabeledPoint(value1[0],value1[1:]) 

Now when I try to do some actions on this parsed line and I get ValueError. The action which I try to do is as below

parse_train = join2_train.map(parsePoint)

parse_train.take(5)

The error message I get is as below

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-63-f53b10964381> in <module>()
      1 parse_train = join2_train.map(parsePoint)
      2 
----> 3 parse_train.take(5)

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py in take(self, num)
   1222 
   1223             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1224             res = self.context.runJob(self, takeUpToNumLeft, p, True)
   1225 
   1226             items += res

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    840         mappedRDD = rdd.mapPartitions(partitionFunc)
    841         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions,
--> 842                                           allowLocal)
    843         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
    844 

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 22.0 failed 1 times, most recent failure: Lost task 0.0 in stage 22.0 (TID 31, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/worker.py", line 101, in main
    process()
  File "/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/worker.py", line 96, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/serializers.py", line 236, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py", line 1220, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-62-0243c4dd1876>", line 18, in parsePoint
ValueError: could not convert string to float: .

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

回答1:

Add this function to check if string can be converted to float:

def isfloat(string):
    try:
        float(string)
        return True
    except ValueError:
        return False

and then in parsePoint:

value1 = [map(float,i) for i in values if isfloat(i)]

By modifying the float line as follows

value1 = [float(i) for i in values]

and then parsing a string with only numeric values, we can get back the correct LabeledPoints. However the real problem is trying to make LabeledPoint objects from strings which cannot be converted to float like TA-00002 as in the join2_train object