I know the accumulator variables are 'write only' from the point of view of tasks, when they are in execution in worker nodes. I was doing some testing on this and I realized that I am able to print the accumulator value in the task.
Here I am initializing the accumulator in the driver:-
scala> val accum = sc.accumulator(123)
accum: org.apache.spark.Accumulator[Int] = 123
Then I go on to define a function 'foo':-
scala> def foo(pair:(String,String)) = { println(accum); pair }
foo: (pair: (String, String))(String, String)
In this function I am simply printing the accumulator and then I return the same pair that was received.
Now I have an RDD called myrdd with the following type:-
scala> myrdd
res13: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[9] at map at <console>:21
And I am now calling the map transformation on this RDD:-
myrdd.map(foo).collect
The 'collect' action is being applied to force evaluation. So what actually happens here is that during this execution a zero (0) is printed for every line of the RDD. Since this RDD has 4 elements, it prints 0 4 times. Since the action 'collect' is there , it also prints all the elements in the end, but that's not really the focus here. So I have two questions:-
- Logically, printing equivalent to reading, because only when you can read, can you print. So why is this allowed? Why was the exception not thrown something that would definitely happen if we try to 'return' the accumulator in the function)?
- Why is it printing 0 as the value of the accumulator, when we had initiated it as 123 in the driver?
After some experimentation I found that if I change function definition to access the actual value property of the accumulator object (accum.value), and then trigger the RDD action as described already, it does indeed throw the exception:-
scala> def foo(pair:(String,String)) = { println(accum.value); pair }
The exception caused during the RDD evaluation:-
Can't read accumulator value in the task
So what I was doing earlier is trying to print the accumulator object itself. But the question still remains as to why it printed 0? Because at driver level if I issue the same command that I used in the function definition, I do indeed get the value 123:-
scala> println(accum)
123
I didn't have to say println(accum.value) for it to work. So why only, when I issue this command in the function which the task uses, does it print 0?
Because worker nodes will never see initial value. Only thing that is passed to workers is
zero
, as defined inAccumulatorParam
. ForAccumulator[Int]
it is simply 0. If you first update an accumulator you'll see updated local value:It is even clearer when you use a single partition:
Because exception is thrown when you access
value
method, andtoString
is not using it at all. Instead it is using privatevalue_
variable, the same one which is returned byvalue
if!deserialized
check passed.