I have RDD[Row]
, which needs to be persisted to a third party repository.
But this third party repository accepts of maximum of 5 MB in a single call.
So I want to create partition based on the size of the data present in RDD and not based on the number of rows present in RDD.
How can I find the size of a RDD
and create partitions based on it?
As Justin and Wang mentioned it is not straight forward to get the size of RDD. We can just do a estimate.
We can sample a RDD and then use SizeEstimator to get the size of sample.
As Wang and Justin mentioned,
based on the size data sampled offline, say, X rows used Y GB offline, Z rows at runtime may take Z*Y/X GB
Here is the sample scala code to get the size/estimate of a RDD.
I am new to scala and spark. Below sample may be written in a better way
def getTotalSize(rdd: RDD[Row]): Long = {
// This can be a parameter
val NO_OF_SAMPLE_ROWS = 10l;
val totalRows = rdd.count();
var totalSize = 0l
if (totalRows > NO_OF_SAMPLE_ROWS) {
val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS)
val sampleRDDSize = getRDDSize(sampleRDD)
totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS)
} else {
// As the RDD is smaller than sample rows count, we can just calculate the total RDD size
totalSize = getRDDSize(rdd)
}
totalSize
}
def getRDDSize(rdd: RDD[Row]) : Long = {
var rddSize = 0l
val rows = rdd.collect()
for (i <- 0 until rows.length) {
rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.asInstanceOf[AnyRef] })
}
rddSize
}
One straight forward way is to call following, depending on whether you want to store your data in serialized form or not, then go to spark UI "Storage" page, you should be able to figure out the total size of the RDD (memory + disk):
rdd.persist(StorageLevel.MEMORY_AND_DISK)
or
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
It is not easy to calculate accurate memory size at runtime. You may try do an estimation at runtime though: based on the size data sampled offline, say, X rows used Y GB offline, Z rows at runtime may take Z*Y/X GB; this is similar to Justin suggested earlier.
Hope this could help.
I think RDD.count() will give you the number of elements in the RDD
This is going to depend on factors such as serialization, so it is not cut and dry. However, you could take a sample set and run some experimentation on that sample data, extrapolating from there.
This is the version to use if you are actually working with big data on a cluster -- i.e. it eliminates the collect.
def calcRDDSize(rdd: RDD[Row]): Long = {
rdd.map(_.mkString(",").getBytes("UTF-8").length.toLong)
.reduce(_+_) //add the sizes together
}
def estimateRDDSize( rdd: RDD[Row], fraction: Double ) : Long = {
val sampleRDD = rdd.sample(true,fraction)
val sampleRDDsize = calcRDDSize(sampleRDD)
println(s"sampleRDDsize is ${sampleRDDsize/(1024*1024)} MB")
val sampleAvgRowSize = sampleRDDsize / sampleRDD.count()
println(s"sampleAvgRowSize is $sampleAvgRowSize")
val totalRows = rdd.count()
println(s"totalRows is $totalRows")
val estimatedTotalSize = totalRows * sampleAvgRowSize
val formatter = java.text.NumberFormat.getIntegerInstance
val estimateInMB = formatter.format(estimatedTotalSize/(1024*1024))
println(s"estimatedTotalSize is ${estimateInMB} MB")
return estimatedTotalSize
}
// estimate using 15% of data
val size = estimateRDDSize(df.rdd,0.15)