mapPartitions returns empty array

2019-04-09 01:25发布

问题:

I have the following RDD which has 4 partitions:-

val rdd=sc.parallelize(1 to 20,4)

Now I try to call mapPartitions on this:-

scala> rdd.mapPartitions(x=> { println(x.size); x }).collect
5
5
5
5
res98: Array[Int] = Array()

Why does it return empty array? The anonymoys function is simply returning the same iterator it received, then how is it returning empty array? The interesting part is that if I remove println statement, it indeed returns non empty array:-

scala> rdd.mapPartitions(x=> { x }).collect
res101: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

This I don't understand. How come the presence of println (which is simply printing size of iterator) affecting the final outcome of the function?

回答1:

That's because x is a TraversableOnce, which means that you traversed it by calling size and then returned it back....empty.

You could work around it a number of ways, but here is one:

rdd.mapPartitions(x=> {
  val list = x.toList;
  println(list.size);
  list.toIterator
}).collect


回答2:

To understand what is going on we have to take a look at the signature of the function you pass to mapPartitions:

(Iterator[T]) ⇒ Iterator[U]

So what is an Iterator? If you take a look at the Iterator documentation you'll see it is a trait which extends TraversableOnce:

trait Iterator[+A] extends TraversableOnce[A]

Above should give you a hint what happens in your case. Iterators provide two methods hasNext and next. To get the size of the Iterator you have to simply iterate over it. After that hasNext returns false and you get an empty Iterator as the result.