PySpark serialization EOFError

2019-02-11 12:44发布

问题:

I am reading in a CSV as a Spark DataFrame and performing machine learning operations upon it. I keep getting a Python serialization EOFError - any idea why? I thought it might be a memory issue - i.e. file exceeding available RAM - but drastically reducing the size of the DataFrame didn't prevent the EOF error.

Toy code and error below.

#set spark context
conf = SparkConf().setMaster("local").setAppName("MyApp")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

#read in 500mb csv as DataFrame
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
     inferschema='true').load('myfile.csv')

#get dataframe into machine learning format
r_formula = RFormula(formula = "outcome ~ .")
mldf = r_formula.fit(df).transform(df)

#fit random forest model
rf = RandomForestClassifier(numTrees = 3, maxDepth = 2)
model = rf.fit(mldf)
result = model.transform(mldf).head()

Running the above code with spark-submit on a single node repeatedly throws the following error, even if the size of the DataFrame is reduced prior to fitting the model (e.g. tinydf = df.sample(False, 0.00001):

Traceback (most recent call last):
  File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/daemon.py", line 157, 
     in manager
  File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/daemon.py", line 61, 
     in worker
  File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/worker.py", line 136, 
     in main if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/serializers.py", line 545, 
     in read_int
    raise EOFError
  EOFError

回答1:

Have you checked to see where in your code the EOError is arising?

My guess would be that it's coming as you attempt to define df with, since that's the only place in your code that the file is actually trying to be read.

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
     inferschema='true').load('myfile.csv')

At every point after this line, your code is working with the variable df, not the file itself, so it would seem likely that this line is generating the error.

A simple way to test if this is the case would be to comment out the rest of your code, and/or place a line like this right after the line above.

print(len(df))

Another way would be to use a try loop, like:

try:
    df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
     inferschema='true').load('myfile.csv')
except:
    print('Didn't load file into df!')

If it turns out that that line is the one generating the EOFError, then you're never getting the dataframes in the first place, so attempting to reduce them won't make a difference.

If that is the line generating the error, two possibilities come to mind:

1) Your code is calling one or both of the .csv files earlier on, and isn't closing it prior to this line. If so, simply close it above your code here.

2) There's something wrong with the .csv files themselves. Try loading them outside of this code, and see if you can get them into memory properly in the first place, using something like csv.reader, and manipulate them in ways you'd expect.



回答2:

The error appears to happen in the pySpark read_int function. Code for which is as follows from spark site :

def read_int(stream):
length = stream.read(4)
if not length:
    raise EOFError
return struct.unpack("!i", length)[0]

This would mean that when reading 4bytes from the stream, if 0 bytes are read, EOF error is raised. The python docs are here.