Calling map on a parallel collection via a referen

2019-02-20 18:01发布

问题:

I tried to make it optional to run a map operation sequentially or in parallel, for example using the following code:

val runParallel = true
val theList = List(1,2,3,4,5)
(if(runParallel) theList.par else theList) map println //Doesn't run in parallel

What I noticed is that the 'map' operation does not run in parallel as I'd expected. Although without the conditional, it would:

theList.par map println   //Runs in parallel as visible in the output

The type of the expression (if(runParallel) theList else theList.par) which I expect to be the closest common ancestor of both types of theList and theList.par is a scary type that I won't paste here but it's interesting to look at (via scala console:)

:type (if(true) theList else theList.par)

Why doesn't the map on the parallel collection work in parallel?

UPDATE: This is discussed in SI-4843 but from the JIRA ticket it's not clear why this was happening on Scala 2.9.x.

回答1:

The explanation of why it happens is a long story: in Scala 2.9.x (I don't know about the other versions) those collections methods such as filter or map relies on the CanBuildFrom mechanism. The idea is that you have an implicit parameter which is used to create a builder for the new collection:

def map[B, That](f: A => B)(implicit bf: CanBuildFrom[Repr, B, That]): That = {
    val b = bf(repr)
    b.sizeHint(this) 
    for (x <- this) b += f(x)
    b.result
  }

Thanks to this mechanism, the map method is defined only in the TraversableLike trait and its subclasses do not need to override it. As you see, inside the method map signature there are many type parameters. Let's look at the trivial ones:

  • The B which is the type of the elements of the new collection
  • The A is the type of the elements in the source collection

Let's look at the more complicated ones:

  • That is the new type of collection, which can be different from the current type. A classical example is when you map for example a BitSet using a toString:

     scala> val a = BitSet(1,3,5)
     a scala.collection.immutable.BitSet = BitSet(1, 3, 5)
     scala>  a.map {_.toString}
     res2: scala.collection.immutable.Set[java.lang.String] = Set(1, 3, 5)
    

Since it is illegal to create a BitSet[String] your map result will be a Set[String] Finally Repr is the type of the current collection. When you try to map a collection over a function, the compiler will resolve a suitable CanBuildFrom using the type parameters.

As it is reasonable, the map method has been overridden in parallel collections in ParIterableLike as the following:

 def map[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf =>
    executeAndWaitResult(new Map[S, That](f, pbf, splitter) mapResult { _.result })
  } otherwise seq.map(f)(bf2seq(bf))

As you can see the method has the same signature, but it uses a different approach: it test whether the provided CanBuildFrom is parallel and otherwise falls back on the default implementation. Therefore, Scala parallel collections use special CanBuildFrom (parallel ones) which create parallel builders for the map methods.

However, what happens when you do

(if(runParallel) theList.par else theList) map println //Doesn't run in parallel

is the map method gets executed on the result of

  (if(runParallel) theList.par else theList) 

whose return type is the first common ancestors of both classes (in this case just a certain number of traits mixed togethers). Since it is a common ancestor, is type parameter Repr will be some kind of common ancestors of both collections representation, let's call it Repr1.


Conclusion

When you call the map method, the compiler should find a suitable CanBuildFrom[Repr, B, That] for the operation. Since our Repr1 is not the one of a parallel collection, there won't be any CanBuildFrom[Repr1,B,That] capable of providing a parallel builder. This is actually a correct behaviour with respect to the implementation of Scala collections, if the behaviour would be different that would mean that every map of non parallel collections would be run in parallel as well.

The point here is that, for how Scala collections are designed in 2.9.x there is no alternative. If the compiler does not provide a CanBuildFrom for a parallel collection, the map won't be parallel.