I have the following RDD which has 4 partitions:-
val rdd=sc.parallelize(1 to 20,4)
Now I try to call mapPartitions on this:-
scala> rdd.mapPartitions(x=> { println(x.size); x }).collect
5
5
5
5
res98: Array[Int] = Array()
Why does it return empty array? The anonymoys function is simply returning the same iterator it received, then how is it returning empty array? The interesting part is that if I remove println statement, it indeed returns non empty array:-
scala> rdd.mapPartitions(x=> { x }).collect
res101: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
This I don't understand. How come the presence of println (which is simply printing size of iterator) affecting the final outcome of the function?
That's because x
is a TraversableOnce
, which means that you traversed it by calling size
and then returned it back....empty.
You could work around it a number of ways, but here is one:
rdd.mapPartitions(x=> {
val list = x.toList;
println(list.size);
list.toIterator
}).collect
To understand what is going on we have to take a look at the signature of the function you pass to mapPartitions
:
(Iterator[T]) ⇒ Iterator[U]
So what is an Iterator
? If you take a look at the Iterator
documentation you'll see it is a trait which extends TraversableOnce
:
trait Iterator[+A] extends TraversableOnce[A]
Above should give you a hint what happens in your case. Iterators provide two methods hasNext
and next
. To get the size
of the Iterator you have to simply iterate over it. After that hasNext
returns false
and you get an empty Iterator
as the result.