Following code is from the quick start guide of Apache Spark.
Can somebody explain me what is the "line" variable and where it comes from?
textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
Also, how does a value get passed into a,b?
Link to the QSG http://spark.apache.org/docs/latest/quick-start.html
First, according to your link, the textfile
is created as
val textFile = sc.textFile("README.md")
such that textfile
is a RDD[String]
meaning it is a resilient distributed dataset of type String
. The API to access is very similar to that of regular Scala collections.
So now what does this map
do?
Imagine you have a list of String
s and want to convert that into a list of Ints, representing the length of each String.
val stringlist: List[String] = List("ab", "cde", "f")
val intlist: List[Int] = stringlist.map( x => x.length )
The map
method expects a function. A function, that goes from String => Int
. With that function, each element of the list is transformed. So the value of intlist is List( 2, 3, 1 )
Here, we have created an anonymous function from String => Int
. That is x => x.length
. One can even write the function more explicit as
stringlist.map( (x: String) => x.length )
If you do use write the above explicit, you can
val stringLength : (String => Int) = {
x => x.length
}
val intlist = stringlist.map( stringLength )
So, here it is absolutely evident, that stringLength is a function from String
to Int
.
Remark: In general, map
is what makes up a so called Functor. While you provide a function from A => B, map
of the functor (here List) allows you use that function also to go from List[A] => List[B]
. This is called lifting.
Answers to your questions
What is the "line" variable?
As mentioned above, line
is the input parameter of the function line => line.split(" ").size
More explicit
(line: String) => line.split(" ").size
Example: If line
is "hello world", the function returns 2.
"hello world"
=> Array("hello", "world") // split
=> 2 // size of Array
How does a value get passed into a,b?
reduce
also expects a function from (A, A) => A
, where A
is the type of your RDD
. Lets call this function op
.
What does reduce
. Example:
List( 1, 2, 3, 4 ).reduce( (x,y) => x + y )
Step 1 : op( 1, 2 ) will be the first evaluation.
Start with 1, 2, that is
x is 1 and y is 2
Step 2: op( op( 1, 2 ), 3 ) - take the next element 3
Take the next element 3:
x is op(1,2) = 3 and y = 3
Step 3: op( op( op( 1, 2 ), 3 ), 4)
Take the next element 4:
x is op(op(1,2), 3 ) = op( 3,3 ) = 6 and y is 4
Result here is the sum of the list elements, 10.
Remark: In general reduce
calculates
op( op( ... op(x_1, x_2) ..., x_{n-1}), x_n)
Full example
First, textfile is a RDD[String], say
TextFile
"hello Tyth"
"cool example, eh?"
"goodbye"
TextFile.map(line => line.split(" ").size)
2
3
1
TextFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
3
Steps here, recall `(a, b) => if (a > b) a else b)`
- op( op(2, 3), 1) evaluates to op(3, 1), since op(2, 3) = 3
- op( 3, 1 ) = 3
Map
and reduce
are methods of RDD class, which has interface similar to scala collections.
What you pass to methods map
and reduce
are actually anonymous function (with one param in map, and with two parameters in reduce). textFile
calls provided function for every element (line of text in this context) it has.
Maybe you should read some scala collection introduction first.
You can read more about RDD class API here:
https://spark.apache.org/docs/1.2.1/api/scala/#org.apache.spark.rdd.RDD
what map function does is, it takes the list of arguments and map it to some function. Similar to map function in python, if you are familiar.
Also, File is like a list of Strings. (not exactly but that's how it's being iterated)
Let's consider this is your file.
val list_a: List[String] = List("first line", "second line", "last line")
Now let's see how map function works.
We need two things, list of values
which we already have and function
to which we want to map this values. let's consider really simple function for understanding.
val myprint = (arg:String)=>println(arg)
this function simply takes single String argument and prints on the console.
myprint("hello world")
hello world
if we match this function to your list, it's gonna print all the lines
list_a.map(myprint)
We can write an anonymous function as mentioned below as well, which does the same thing.
list_a.map(arg=>println(arg))
in your case, line
is the first line of the file. you could change the argument name as you like. for example, in above example, if I change arg
to line
it would work without any issue
list_a.map(line=>println(line))