Apache Spark Broadcast variables are type Broadcas

2019-09-12 03:19发布

问题:

Just trying to clarify something, some low-hanging fruit, a question generated by watching a user in another question trying to call RDD operations on a broadcast variable? That's wrong, right?

Question Is: A Spark broadcast variable is not an RDD, correct? It's a collection in Scala, am I seeing that correctly?

Looking at the Scala docs: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.broadcast.Broadcast

So it has whatever sub-type it's assigned when it's created, the sub-type of whatever is passed to it? Like if this was a Java ArrayList it would be an ArrayList of Integers? So

sc.broadcast([0,1,2]) would create a Broadcast[Array[Int]] in scala-notation?

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

( I really did search around quite a bit for a clear straighforward answer but it must be too basic of a question, yet so important to understand, thanks.)

Would be nice but not necessary to have some info on what Python does with Broadcasts, I assume it calls the underlying Scala class and it's stored as a Scala Broadcast type underneath the hood?

回答1:

A broadcast variable is not an RDD, however it's not necessarily a scala collection either. Essentially you should just think of a broadcast variable as a local variable that is local to every machine. Every worker will have a copy of whatever you've broadcasted so you don't need to worry about assigning it to specific RDD values.

The best time to use and RDD is when you have a fairly large object that you're going to need for most values in the RDD.

An example would be

val zipCodeHash:HashMap[(Int, List[Resident])] //potentially a very large hashmap
val BVZipHash = sc.broadcast(zipCodeHash)

val zipcodes:Rdd[String] = sc.textFile("../zipcodes.txt")

val allUsers = zipcodes.flatMap(a => BVZipHash.value((a.parseInt)))

In this situation since the hashmap could potentially be very large it would be extremely wasteful to create a new copy for every value in the map function.

I hope this helps!

edit: some minor mistakes in my code

edit2:

To go slightly more into the nuts and bolts of what a Broadcast variable actually is:

A broadcast variable actually a variable of type Broadcast that can contain any class (anything from an Int to any object you create). It is by no means a scala collection. All the broadcast class actually does is offer one of two ways of efficiently transporting the data to all the workers to recreate the values (internally spark has a bittorent-like P2P broadcasting system, though it also allows http transferring, though I'm not sure when it does either).

For more information on what a broadcast variable is and how use it I'd recommend checking out this link:

http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

I'd also highly recommend looking into this book as it's been very helpful to me:

http://shop.oreilly.com/product/0636920028512.do