How do `map` and `reduce` methods work in Spark RD

2019-03-11 03:06发布

Following code is from the quick start guide of Apache Spark. Can somebody explain me what is the "line" variable and where it comes from?

textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

Also, how does a value get passed into a,b?

Link to the QSG http://spark.apache.org/docs/latest/quick-start.html

3条回答
Emotional °昔
2楼-- · 2019-03-11 03:19

First, according to your link, the textfile is created as

val textFile = sc.textFile("README.md")

such that textfile is a RDD[String] meaning it is a resilient distributed dataset of type String. The API to access is very similar to that of regular Scala collections.

So now what does this map do?

Imagine you have a list of Strings and want to convert that into a list of Ints, representing the length of each String.

val stringlist: List[String] = List("ab", "cde", "f")
val intlist: List[Int] = stringlist.map( x => x.length )

The map method expects a function. A function, that goes from String => Int. With that function, each element of the list is transformed. So the value of intlist is List( 2, 3, 1 )

Here, we have created an anonymous function from String => Int. That is x => x.length. One can even write the function more explicit as

stringlist.map( (x: String) => x.length )  

If you do use write the above explicit, you can

val stringLength : (String => Int) = {
  x => x.length
}
val intlist = stringlist.map( stringLength )

So, here it is absolutely evident, that stringLength is a function from String to Int.

Remark: In general, map is what makes up a so called Functor. While you provide a function from A => B, map of the functor (here List) allows you use that function also to go from List[A] => List[B]. This is called lifting.

Answers to your questions

What is the "line" variable?

As mentioned above, line is the input parameter of the function line => line.split(" ").size

More explicit (line: String) => line.split(" ").size

Example: If line is "hello world", the function returns 2.

"hello world" 
=> Array("hello", "world")  // split 
=> 2                        // size of Array

How does a value get passed into a,b?

reduce also expects a function from (A, A) => A, where A is the type of your RDD. Lets call this function op.

What does reduce. Example:

List( 1, 2, 3, 4 ).reduce( (x,y) => x + y )
Step 1 : op( 1, 2 ) will be the first evaluation. 
  Start with 1, 2, that is 
    x is 1  and  y is 2
Step 2:  op( op( 1, 2 ), 3 ) - take the next element 3
  Take the next element 3: 
    x is op(1,2) = 3   and y = 3
Step 3:  op( op( op( 1, 2 ), 3 ), 4) 
  Take the next element 4: 
    x is op(op(1,2), 3 ) = op( 3,3 ) = 6    and y is 4

Result here is the sum of the list elements, 10.

Remark: In general reduce calculates

op( op( ... op(x_1, x_2) ..., x_{n-1}), x_n)

Full example

First, textfile is a RDD[String], say

TextFile
 "hello Tyth"
 "cool example, eh?"
 "goodbye"

TextFile.map(line => line.split(" ").size)
 2
 3
 1
TextFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
 3
   Steps here, recall `(a, b) => if (a > b) a else b)`
   - op( op(2, 3), 1) evaluates to op(3, 1), since op(2, 3) = 3 
   - op( 3, 1 ) = 3
查看更多
等我变得足够好
3楼-- · 2019-03-11 03:22

Map and reduce are methods of RDD class, which has interface similar to scala collections.

What you pass to methods map and reduce are actually anonymous function (with one param in map, and with two parameters in reduce). textFile calls provided function for every element (line of text in this context) it has.

Maybe you should read some scala collection introduction first.

You can read more about RDD class API here: https://spark.apache.org/docs/1.2.1/api/scala/#org.apache.spark.rdd.RDD

查看更多
▲ chillily
4楼-- · 2019-03-11 03:39

what map function does is, it takes the list of arguments and map it to some function. Similar to map function in python, if you are familiar.

Also, File is like a list of Strings. (not exactly but that's how it's being iterated)

Let's consider this is your file.

val list_a: List[String] = List("first line", "second line", "last line")

Now let's see how map function works.

We need two things, list of values which we already have and function to which we want to map this values. let's consider really simple function for understanding.

val myprint = (arg:String)=>println(arg)

this function simply takes single String argument and prints on the console.

myprint("hello world")
hello world

if we match this function to your list, it's gonna print all the lines

list_a.map(myprint)

We can write an anonymous function as mentioned below as well, which does the same thing.

list_a.map(arg=>println(arg))

in your case, line is the first line of the file. you could change the argument name as you like. for example, in above example, if I change arg to line it would work without any issue

list_a.map(line=>println(line))
查看更多
登录 后发表回答