My spark application is using RDD's of numpy arrays.
At the moment, I'm reading my data from AWS S3, and its represented as
a simple text file where each line is a vector and each element is seperated by space, for example:
1 2 3
5.1 3.6 2.1
3 0.24 1.333
I'm using numpy's function loadtxt()
in order to create a numpy array from it.
However, this method seems to be very slow and my app is spending too much time(I think) for converting my dataset to a numpy array.
Can you suggest me a better way for doing it? For example, should I keep my dataset as a binary file?, should I create the RDD in another way?
Some code for how I create my RDD:
data = sc.textFile("s3_url", initial_num_of_partitions).mapPartitions(readData)
readData function:
def readPointBatch(iterator):
return [(np.loadtxt(iterator,dtype=np.float64)]
You shouldn't use
numpy
while working with Spark. Spark has its own methodology of processing data assuring that your sometimes really big files aren't loaded into memory at once, exceeding the memory limit. You should load your file like this with Spark:Now this will output an
RDD
like this, based on your example:@edit Some suggestions on file formats and
numpy
usage:Text files are just as good as CSV, TSV, Parquet or anything you feel comfortable with. Binary files are not preferred, according to the Spark docs on binary files loading:
As for
numpy
usage, if I were you I'd deffinitely tried to replace any external package with native Spark, for examplepyspark.mlib.random
for randomization: http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#module-pyspark.mllib.randomThe best thing to do under these circumstances is to use pandas library for io.
Please refer to this question : pandas read_csv() and python iterator as input .
There you will see how to replace the
np.loadtxt()
function so it would be much faster tocreate a RDD of numpy array.
It would be a little bit more idiomatic and slightly faster to simply map with
numpy.fromstring
as follows:but ignoring that there is nothing particularly wrong with your approach. As far as I can tell, with basic configuration, it is roughly twice a slow a simply reading the data and slightly slower than creating dummy numpy arrays.
So it looks like the problem is somewhere else. It could be cluster misconfiguration, cost of fetching data from S3 or even unrealistic expectations.