I am new to spark programming and scala and I am not able to understand the difference between map and flatMap. While using flatMap, why is "Option" used in method is working fine
def parseNames(line: String) : Option[(Int,String)] = {
var fields = line.split('\"')
if (fields.length >1) {
return Some(fields(0).trim().toInt,fields(1) )
}
else {
return None
}
}
def main(args: Array[String]) {
val sc = new SparkContext("local[*]","DemoHero")
val txt= sc.textFile("../marvel-names1.txt")
val rdd = txt.flatMap(parseNames)
but without "Option", an error is coming:
def parseNames(line: String) : (Int, String) = {
var fields = line.split('\"')
(fields(0).trim().toInt,fields(1) )
}
def main(args: Array[String]) {
val sc = new SparkContext("local[*]","DemoHero")
val txt= sc.textFile("../marvel-names1.txt")
val rdd = txt.flatMap(parseNames)
As per my understanding, flatmap make Rdd in to collection for String/Int Rdd. I was thinking that in this case both should work without any error. Please let me know where I am making the mistake.
TL;DR: There is an implicit conversion from Option
to Iterable
, this is why your first
flatMap
does not fail.
From the inheritance hierarchy of Option
it is not at all clear why RDD's flatMap
that expects an
argument with TraversableOnce
in return type would accept a function that returns an Option
, because
Option
does not extend TraversableOnce
.
However, if you print the desugared code generated by your flatMap
, the following synthetic function definition appears:
@SerialVersionUID(value = 0) final <synthetic> class anonfun$1 extends scala.runtime.AbstractFunction1 with Serializable {
final def apply(line: String): Iterable = scala.this.Option.option2Iterable(org.example.ClassName.parseNames$1(line));
final <bridge> <artifact> def apply(v1: Object): Object = anonfun$1.this.apply(v1.$asInstanceOf[String]());
def <init>(): <$anon: Function1> = {
anonfun$1.super.<init>();
()
}
}
The details are not that important, it's some thing that takes a line: String
and returns an Iterable
.
What's interesting is the Option.option2Iterable
part.
This is an implicit conversion defined directly on Option,
it quietly converts options into Iterable
, and Iterable
is a special case of TraversableOnce
.
This is how the compiler can sneak in the option2Iterable
into a synthetic Function
-definition
that mediates between your method and the invocation of flatMap
. Now you have an argument of
type String => Iterable[(Int, String)]
, so the flatMap
compiles fine.
Note that it wouldn't work without a synthetic Function
-instance that wraps your method. If you declared parseNames
like this:
def parseNames: String => Option[(Int,String)] = { line =>
this would be a straightforward compiler error.
Your second code snippet shouldn't compile, and luckily, it indeed doesn't: pairs are not Traversable
, so
flatMap
does not accept a parseNames(line: String) : (Int, String)
as argument. What you want to use here is
map
, because you want to map each string to exactly one pair of (Int, String)
.
The flatMap
is for a different use case: it's for converting each element in your original collection into
another collection, and then flattening out all resulting collections into a single collection, so, for example,
sc.parallelize(List(1, 2, 3)).flatMap{ x => List(x, x*x, x*x*x) }
would first produce a TraversableOnce
for each x
:
List(1,1,1)
List(2,4,8)
List(3,9,27)
and then glue them all together, so that you would obtain an RDD with entries
1,1,1,2,4,8,3,9,27
It works with Option
in the same way, because "morally" it is something like a list with 0-to-1 elements, even though it doesn't say that explicitly in its inheritance hierarchy.
Notice about the formulation "should not compile": whenever I write that (your code, or some other code) "should not compile", I don't mean that I generally wish that you had compile errors in your code. What I do mean is that if there is some problem in the code, the compiler should produce a clear error message as soon as possible.
def parseNames (line: String): Option[(Int,String)] = {
var fields = line.split('\"')
if (fields.length > 1) {
Some (fields(0).trim ().toInt, fields(1))
}
else {
None
}
}
(removed the noisy "return")
Well, when is None
returned? If fields.length is not > 1. If there aren't at least 2 fields (fields(0) and fields(1)), fields(0).trim().toInt
might succeed, but fields(1)
will fail.
flatMap
would require iterables as the return type of the function being called. Because flatMap
would iterate through each element of iterable and return each element flattened.
In your first parseNames
function, Option[(Int, String)]
is returned which is a container and it can operate like an iterable due to use of implicit functions. So flatMap
worked.
But in your second parseNames
, Tuple2[Int, String]
is returned which is not an iterable. As Tuple2
cannot be iterated but the elements can be get by using _1
and _2
. So the flatMap
showed you compilation error .
I hope the explanation is clear.
The second parseNames
would have worked if you return Array
as
def parseNames(line: String) : Array[(Int, String)] = {
var fields = line.split('\"')
Array((fields(0).trim().toInt,fields(1)))
}
or List
as
def parseNames(line: String) : List[(Int, String)] = {
var fields = line.split('\"')
List((fields(0).trim().toInt,fields(1)))
}
or Seq
as
def parseNames(line: String) : Seq[(Int, String)] = {
var fields = line.split('\"')
Seq((fields(0).trim().toInt,fields(1)))
}
Because all of them are iterables as Option
is.