Scala ParArray Sorting

2019-04-13 12:50发布

How to sort in ascending order a ParArray collection such as

ParArray(1,3,2)

or else, which parallel collections may be more suitable for this purpose ?

Update

How to implement a parallel algorithm on ParArray that may prove more efficient than casting to a non parallel collection for sequential sorting ?

5条回答
成全新的幸福
2楼-- · 2019-04-13 13:29

If your data can fit in memory, then single thread in memory sort is fast enough. If you need to load a lot of data from disk or HDFS, then you can do the sort on a distributed system like hadoop or spark.

查看更多
够拽才男人
3楼-- · 2019-04-13 13:34
def parallelSort[A : Ordering](seq: ParIterable[A]): TreeSet[A] = {
  seq.aggregate[TreeSet[A]](TreeSet.empty[A])(
  (set, a) => set + a,
  (set, set) => set ++ set)
}
查看更多
一夜七次
4楼-- · 2019-04-13 13:39

How to implement a parallel algorithm on ParArray that may prove more efficient than casting to a non parallel collection for sequential sorting?

My first obvervation would be that there doesn't seem to be much performance penalty for "converting" parallel arrays to sequential and back:

def time[R](block: => R): R = {
  val t0 = System.nanoTime()
  val result = block    // call-by-name
  val t1 = System.nanoTime()
  val diff: Long = t1 - t0
  println(s"Elapsed time: ${diff * 1.0 / 1E9}s")
  result
}

def main(args: Array[String]): Unit = {
  val size: Int = args.headOption.map(_.toInt).getOrElse(1000000)
  val input = Array.fill(size)(Random.nextInt())
  val arrayCopy: Array[Int] = Array.ofDim(size)
  input.copyToArray(arrayCopy)
  time { input.sorted }
  val parArray = arrayCopy.par
  val result = time { parArray.seq.sorted.toArray.par }
}

gives

> run 1000000
[info] Running Runner 1000000
Elapsed time: 0.344659236s
Elapsed time: 0.321363896s

For all Array sizes I tested the results are very similar and usually somehow in favor of the second expression. So in case you were worried that converting to sequential collections and back will kill the performance gains you achieved on other operations - I don't think you should be.

When it comes to utilizing Scala's parallel collections to achieve parallel sorting that in some cases would perform better than the default - I don't think there's an obvious good way of doing that, but it wouldn't hurt to try:

What I thought should work would be splitting the input array into as many subarrays as you have cores in your computer (preferably without any unnecessary copying) and sorting the parts concurrently. Afterwards one might merge (as in merge sort) the parts together. Here's how the code might look like:

val maxThreads = 8 //for simplicity we're not configuring the thread pool explicitly
val groupSize:Int = size/maxThreads + 1
val ranges: IndexedSeq[(Int, Int)] = (0 until maxThreads).map(i => (i * groupSize, (i + 1) * groupSize))
time {
  //parallelizing sorting for each range
  ranges.par.foreach {case (from, to) =>
    input.view(from, to).sortWith(_ < _)
  }
  //TODO merge the parts together
}

Unfortunately there's this old bug that prevents us from doing anything fun with views. There doesn't seem to be any Scala built-in mechanism (other than views) for sorting just a part of a collection. This is why I tried coding my own merge sort algorithm with the signature of def mergeSort(a: Array[Int], r: Range): Unit to use it as I described above. Unfortunately it seems to be more than 4 times less effective than the scala Array.sorted method so I don't think it could be used to gain efficiency over the standard sequential approach.

If I understand your situation correctly, your dataset fits in memory, so using something like Hadoop and MapReduce would be premature. What you might try though would be Apache Spark - other than adding a dependency you wouldn't need to set up any cluster or install anything for Spark to use all cores of your machine in a basic configuration. Its RDD's are ideologically similar to Scala's Parallel Collections, but with additional functionalities. And they (in a way) support parallel sorting.

查看更多
你好瞎i
5楼-- · 2019-04-13 13:39

There are no parallel sorting algorithms available in the Scala standard library. For this reason, the parallel collection don't provide sorted, sortBy, or sortWith methods. You will have to convert to an appropriate sequential class (e.g. with toArray) before sorting.

查看更多
SAY GOODBYE
6楼-- · 2019-04-13 13:47

If you build your Scala project against Java 8, there is the new Arrays.parallelSort you can use:

def sort[T <: Comparable](parArray: ParArray[T])(implicit c: ClassTag[T]): ParArray[T] = {
   var array = new Array[T](parArray.size) // Or, to prevent copying, var array = parArray.seq.array.asInstanceOf[Array[T]] might work?
   parArray.copyToArray(array)
   java.util.Arrays.parallelSort(array)
   ParArray.createFromCopy(array)
}
查看更多
登录 后发表回答