Spark --Error :type mismatch; found : (Int, String

2019-08-01 15:26发布

问题:

I am new to spark programming and scala and I am not able to understand the difference between map and flatMap. While using flatMap, why is "Option" used in method is working fine

def parseNames(line: String) : Option[(Int,String)]  = {
  var fields = line.split('\"')
  if (fields.length >1) {
    return Some(fields(0).trim().toInt,fields(1) )
  }
  else {
    return None
  }
}
def main(args: Array[String]) {
  val sc = new SparkContext("local[*]","DemoHero")
  val txt= sc.textFile("../marvel-names1.txt")  
  val rdd = txt.flatMap(parseNames)

but without "Option", an error is coming:

def parseNames(line: String) : (Int, String)  = {
  var fields = line.split('\"')    
  (fields(0).trim().toInt,fields(1) )
}

def main(args: Array[String]) {
  val sc = new SparkContext("local[*]","DemoHero")   
  val txt= sc.textFile("../marvel-names1.txt")  
  val rdd = txt.flatMap(parseNames)

As per my understanding, flatmap make Rdd in to collection for String/Int Rdd. I was thinking that in this case both should work without any error. Please let me know where I am making the mistake.

回答1:

TL;DR: There is an implicit conversion from Option to Iterable, this is why your first flatMap does not fail.


From the inheritance hierarchy of Option it is not at all clear why RDD's flatMap that expects an argument with TraversableOnce in return type would accept a function that returns an Option, because Option does not extend TraversableOnce.

However, if you print the desugared code generated by your flatMap, the following synthetic function definition appears:

@SerialVersionUID(value = 0) final <synthetic> class anonfun$1 extends scala.runtime.AbstractFunction1 with Serializable {
  final def apply(line: String): Iterable = scala.this.Option.option2Iterable(org.example.ClassName.parseNames$1(line));
  final <bridge> <artifact> def apply(v1: Object): Object = anonfun$1.this.apply(v1.$asInstanceOf[String]());
  def <init>(): <$anon: Function1> = {
    anonfun$1.super.<init>();
    ()
  }
}

The details are not that important, it's some thing that takes a line: String and returns an Iterable. What's interesting is the Option.option2Iterable part.

This is an implicit conversion defined directly on Option, it quietly converts options into Iterable, and Iterable is a special case of TraversableOnce.

This is how the compiler can sneak in the option2Iterable into a synthetic Function-definition that mediates between your method and the invocation of flatMap. Now you have an argument of type String => Iterable[(Int, String)], so the flatMap compiles fine.

Note that it wouldn't work without a synthetic Function-instance that wraps your method. If you declared parseNames like this:

def parseNames: String => Option[(Int,String)] = { line => 

this would be a straightforward compiler error.


Your second code snippet shouldn't compile, and luckily, it indeed doesn't: pairs are not Traversable, so flatMap does not accept a parseNames(line: String) : (Int, String) as argument. What you want to use here is map, because you want to map each string to exactly one pair of (Int, String).

The flatMap is for a different use case: it's for converting each element in your original collection into another collection, and then flattening out all resulting collections into a single collection, so, for example,

sc.parallelize(List(1, 2, 3)).flatMap{ x => List(x, x*x, x*x*x) }

would first produce a TraversableOnce for each x:

List(1,1,1)
List(2,4,8)
List(3,9,27)

and then glue them all together, so that you would obtain an RDD with entries

1,1,1,2,4,8,3,9,27

It works with Option in the same way, because "morally" it is something like a list with 0-to-1 elements, even though it doesn't say that explicitly in its inheritance hierarchy.


Notice about the formulation "should not compile": whenever I write that (your code, or some other code) "should not compile", I don't mean that I generally wish that you had compile errors in your code. What I do mean is that if there is some problem in the code, the compiler should produce a clear error message as soon as possible.



回答2:

def parseNames (line: String): Option[(Int,String)]  = {
  var fields = line.split('\"')
  if (fields.length > 1) {
    Some (fields(0).trim ().toInt, fields(1))
  }
  else {
    None
  }
}

(removed the noisy "return")

Well, when is None returned? If fields.length is not > 1. If there aren't at least 2 fields (fields(0) and fields(1)), fields(0).trim().toInt might succeed, but fields(1) will fail.



回答3:

flatMap would require iterables as the return type of the function being called. Because flatMap would iterate through each element of iterable and return each element flattened.

In your first parseNames function, Option[(Int, String)] is returned which is a container and it can operate like an iterable due to use of implicit functions. So flatMap worked.

But in your second parseNames, Tuple2[Int, String] is returned which is not an iterable. As Tuple2 cannot be iterated but the elements can be get by using _1 and _2. So the flatMap showed you compilation error .

I hope the explanation is clear.

The second parseNames would have worked if you return Array as

def parseNames(line: String) : Array[(Int, String)]  = {
  var fields = line.split('\"')

  Array((fields(0).trim().toInt,fields(1)))
}

or List as

def parseNames(line: String) : List[(Int, String)]  = {
  var fields = line.split('\"')

  List((fields(0).trim().toInt,fields(1)))
}

or Seq as

def parseNames(line: String) : Seq[(Int, String)]  = {
  var fields = line.split('\"')

  Seq((fields(0).trim().toInt,fields(1)))
}

Because all of them are iterables as Option is.