How to partition a RDD

2019-04-02 04:48发布

问题:

I have a text file consisting of a large number of random floating values separated by spaces. I am loading this file into a RDD in scala. How does this RDD get partitioned?

Also, is there any method to generate custom partitions such that all partitions have equal number of elements along with an index for each partition?

val dRDD = sc.textFile("hdfs://master:54310/Data/input*")
keyval=dRDD.map(x =>process(x.trim().split(' ').map(_.toDouble),query_norm,m,r))

Here I am loading multiple text files from HDFS and process is a function I am calling. Can I have a solution with mapPartitonsWithIndex along with how can I access that index inside the process function? Map shuffles the partitions.

回答1:

You can generate custom partitions using the coalesce function:

coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]


回答2:

The loaded rdd is partitioned by default partitioner: hash code. To specify custom partitioner, use can check rdd.partitionBy(), provided with your own partitioner.

I don't think it's ok to use coalesce() here, as by api docs, coalesce() can only be used when we reduce number of partitions, and even we can't specify a custom partitioner with coalesce().



回答3:

How does an RDD gets partitioned?

By default a partition is created for each HDFS partition, which by default is 64MB. Read more here.

How to balance my data across partitions?

First, take a look at the three ways one can repartition his data:

1) Pass a second parameter, the desired minimum number of partitions for your RDD, into textFile(), but be careful:

In [14]: lines = sc.textFile("data")

In [15]: lines.getNumPartitions()
Out[15]: 1000

In [16]: lines = sc.textFile("data", 500)

In [17]: lines.getNumPartitions()
Out[17]: 1434

In [18]: lines = sc.textFile("data", 5000)

In [19]: lines.getNumPartitions()
Out[19]: 5926

As you can see, [16] doesn't do what one would expect, since the number of partitions the RDD has, is already greater than the minimum number of partitions we request.

2) Use repartition(), like this:

In [22]: lines = lines.repartition(10)

In [23]: lines.getNumPartitions()
Out[23]: 10

Warning: This will invoke a shuffle and should be used when you want to increase the number of partitions your RDD has.

From the docs:

The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.

3) Use coalesce(), like this:

In [25]: lines = lines.coalesce(2)

In [26]: lines.getNumPartitions()
Out[26]: 2

Here, Spark knows that you will shrink the RDD and gets advantage of it. Read more about repartition() vs coalesce().


But will all this guarantee that your data will be perfectly balanced across your partitions? Not really, as I experienced in How to balance my data across the partitions?