I'm converting a numpy matrix into an RDD with partition size 10.
from pyspark import SparkContext, SparkConf
sc = SparkContext("local", "Simple App")
x = np.matrix(np.random.normal(mu, sigma, 10000), dtype=np.float)
x.shape=(100,100)
rdd = sc.parallelize(x, 10)
Here each row in RDD is a matrix object, and I can access it using rdd.collect()[row_num][0]
. How can I group ten rows into each partition?. Earlier I tried using a Dense matrix, I couldn't get proper results.
I finally wrote it myself. :). I know it's not efficient, but it solves the problem and might help someone, before a better answer is posted.
def group_rows(rdd):
rdd_collect = rdd.collect()
count = 0
key_count = 0
result = {"0":[],"1":[],"2":[],"3":[],"4":[],"5":[],"6":[],"7":[],"8":[],"9":[]}
for i in range(100):
if count < 10:
temp_value = result[str(key_count)]
temp_value.append(rdd_collect[i][0])
result[str(key_count)] = temp_value
count += 1
else:
count = 0
key_count += 1
return result
result = group_rows(e3.rdd)
temp = e3.sc.parallelize(result.items())
sorted(temp.groupByKey().collect())