How to print accumulator variable from within task

2019-01-19 16:33发布

问题:

I know the accumulator variables are 'write only' from the point of view of tasks, when they are in execution in worker nodes. I was doing some testing on this and I realized that I am able to print the accumulator value in the task.

Here I am initializing the accumulator in the driver:-

scala> val accum  = sc.accumulator(123)
accum: org.apache.spark.Accumulator[Int] = 123

Then I go on to define a function 'foo':-

scala> def foo(pair:(String,String)) = { println(accum); pair }
foo: (pair: (String, String))(String, String)

In this function I am simply printing the accumulator and then I return the same pair that was received.

Now I have an RDD called myrdd with the following type:-

scala> myrdd
res13: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[9] at map at <console>:21

And I am now calling the map transformation on this RDD:-

myrdd.map(foo).collect

The 'collect' action is being applied to force evaluation. So what actually happens here is that during this execution a zero (0) is printed for every line of the RDD. Since this RDD has 4 elements, it prints 0 4 times. Since the action 'collect' is there , it also prints all the elements in the end, but that's not really the focus here. So I have two questions:-

  1. Logically, printing equivalent to reading, because only when you can read, can you print. So why is this allowed? Why was the exception not thrown something that would definitely happen if we try to 'return' the accumulator in the function)?
  2. Why is it printing 0 as the value of the accumulator, when we had initiated it as 123 in the driver?

After some experimentation I found that if I change function definition to access the actual value property of the accumulator object (accum.value), and then trigger the RDD action as described already, it does indeed throw the exception:-

scala> def foo(pair:(String,String)) = { println(accum.value); pair }

The exception caused during the RDD evaluation:-

Can't read accumulator value in the task

So what I was doing earlier is trying to print the accumulator object itself. But the question still remains as to why it printed 0? Because at driver level if I issue the same command that I used in the function definition, I do indeed get the value 123:-

scala> println(accum)
123

I didn't have to say println(accum.value) for it to work. So why only, when I issue this command in the function which the task uses, does it print 0?

回答1:

Why is it printing 0 as the value of the accumulator, when we had initiated it as 123 in the driver?

Because worker nodes will never see initial value. Only thing that is passed to workers is zero, as defined in AccumulatorParam. For Accumulator[Int] it is simply 0. If you first update an accumulator you'll see updated local value:

val acc = sc.accumulator(123)
val rdd = sc.parallelize(List(1, 2, 3))
rdd.foreach(i => {acc += i; println(acc)})

It is even clearer when you use a single partition:

rdd.repartition(1).foreach(i => {acc += i; println(acc)}

Why was the exception not thrown (...)?

Because exception is thrown when you access value method, and toString is not using it at all. Instead it is using private value_ variable, the same one which is returned by value if !deserialized check passed.