sc = SparkContext("Local")
rdd = sc.binaryFiles(Path to the binary file , minPartitions = 5).partitionBy(8)
or
sc = SparkContext("Local")
rdd = sc.binaryFiles(Path to the binary file , minPartitions = 5).repartition(8)
Using either of the above codes, I am trying to make 8 partitions in my RDD {wherein, I want the data to be distributed evenly on all the partitions}. When I am printing {rdd.getNumPartitions()}, the number of partitions shown are 8 only, but on Spark UI, I have observed that though 8 partitions are made but the all the whole binary file data is put on only one partition.
Note: minPartition attribute is not working. Even after setting minPartitions = 5,the number of partitions made in RDD is 1 only. Thus, used partitionBy/repartition Functions.
Is this is the desired behaviour or Am I missing something?
Spark 2.4+, the problem should be fixed, see @Rahul's comment below this answer.
Spark 2.1-2.3, the minPartitions
argument of binaryFiles()
is ignored. See Spark-16575 and the commit changes to function setMinPartitions(). Notice in the commit changes how minPartitions
isn't used anymore in the function!
If you are reading multiple binary files with binaryFiles()
, the input files will be coalesced into partitions based on the following:
spark.files.maxPartitionBytes
, default 128 MB
spark.files.openCostInBytes
, default 4 MB
spark.default.parallelism
- the total size of your input
The first three config items are described here. See the commit change above to see the actual calculation.
I had a scenario where I wanted a max of 40 MB per input partition, hence 40 MB per task... to increase parallelism while parsing. (Spark was putting 128 MB into each partition, slowing down my app.) I set spark.files.maxPartitionBytes
to 40 M before calling binaryFiles()
:
spark = SparkSession \
.builder \
.config("spark.files.maxPartitionBytes", 40*1024*1024)
For only one input file, @user9864979's answer is correct: a single file cannot be split into multiple partitions using just binaryFiles()
.
When reading multiple files with Spark 1.6, the minPartitions
argument does work, and you have to use it. If you don't, you'll experience the Spark-16575 problem: all of your input files will be read into only two partitions!
You will find that Spark will normally give you fewer input partitions than you request. I had a scenario where I wanted one input partition for every two input binary files. I found that setting minPartitions
to "the # of input files * 7 / 10" gave me roughly what I wanted.
I had another scenario where I wanted one input partition for each input file. I found that setting minPartitions
to "the # of input files * 2" gave me what I wanted.
Spark 1.5 behavior of binaryFiles()
: you get one partition for each input file.
TL;DR That's the expected behavior.
Since you read file with binaryFiles
a whole content of the file is loaded as a single record, and single records cannot be split across multiple partitions. There is simply nothing to distribute here.