Controlling the data partition in Apache Spark

2019-06-05 09:03发布

问题:

Data Looks Like:

col 1 col 2 col 3 col 4
row 1 row 1 row 1 row 1
row 2 row 2 row 2 row 2
row 3 row 3 row 3 row 3
row 4 row 4 row 4 row 4
row 5 row 5 row 5 row 5
row 6 row 6 row 6 row 6

Problem: I want to partition this data, lets say row 1 and row 2 will be processed as one partition, row 3 and row 4 as another, row 5 and row 6 as another and create a JSON data merging them together with the column (column headers with data values in rows).

Output should be like:
[
{col1:row1,col2:row1:col3:row1:col4:row1},
{col1:row2,col2:row2:col3:row2:col4:row2},
{col1:row3,col2:row3:col3:row3:col4:row3},
{col1:row4,col2:row4:col3:row4:col4:row4},...
]

I tried using repartion(num) available in spark but it is not exactly partitioning as i want. therefore the JSON data generated is not valid. i had issue with why my program was taking same time for processing the data even though i was using different number of cores which can be found here and the repartition suggestion was suggested by @Patrick McGloin . The code mentioned in that problem is something i am trying to do.

回答1:

Guess what you need is partitionBy. In Scala you can provide to it a custom build HashParitioner, while in Python you pass partitionFunc. There is a number of examples out there in Scala, so let me briefly explain the Python flavour.

partitionFunc expects a tuple, with first element being the key. Lets assume you organise your data in the following fashion: (ROW_ID, (A,B,C,..)) where ROW_ID = [1,2,3,...,k]. You can always add ROW_ID and remove it afterwards.

To get a new partition every two rows:

rdd.partitionBy(numPartitions = int(rdd.count() / 2),
                partitionFunc = lambda key: int(key / 2)

partitionFunc will produce a sequence 0,0,1,1,2,2,... This number will be a number of partition to which given row will belong.