使用ThreadPoolTask​​Support作为tasksupport在斯卡拉并行收集(Usi

2019-10-29 05:52发布

我有50-500 IO密集型任务,其中的每一个可以从约3分钟运行,以20分钟的List。 名单之前计算并没有新的任务递归增加。 我想与固定池的大小运行它(比如4)。 任务不依赖于其他任务。

所以我想这符合ThreadPoolTask​​Support比ForkJoinTaskSupport用例更好。 但ThreadPoolTask​​Support已被弃用,ForkJoinTaskSupport只推荐替代。

我试着用ForkJoinTaskSupport,但似乎最后8级左右的任务留在启动时杀死线程,因此最后3-4的任务在单线程运行,增加1小时,总运行时间。

有办法解决在ForkJoinTaskSupport这种行为或我应该使用ThreadPoolTask​​Support有固定的大小,尽管它被弃用?

代码来测试这一点。

object ThreadLibTest  {
  def main(arr:Array[String]):Unit = {
    val tasks = ListBuffer.empty[() => Unit]
    for (i <- 1 to 65) {
      tasks += {() => {
          Thread.sleep(1000)
//          println("finishing " + i + " id " + Thread.currentThread().getName)
      }
      }
    }

    val ptasks = tasks.par
    val fjp = new ForkJoinPool(6)
    ptasks.tasksupport = new ForkJoinTaskSupport(fjp)
    ptasks.map(x => {logfjp(fjp); x.apply()})

  }

  def logfjp(pool: ForkJoinPool) {
    println(
                " activeThreads=" + pool.getActiveThreadCount() +
                " runningThreads=" + pool.getRunningThreadCount() +
                " poolSize=" + pool.getPoolSize() +
                " queuedTasks=" + pool.getQueuedTaskCount() +
                " queuedSubmissions=" + pool.getQueuedSubmissionCount() +
                " parallelism=" + pool.getParallelism() +
                " stealCount=" + pool.getStealCount());    
  }

}
Last few outputs.
 activeThreads=6 runningThreads=2 poolSize=6 queuedTasks=2 queuedSubmissions=0 parallelism=6 stealCount=0
 activeThreads=6 runningThreads=2 poolSize=6 queuedTasks=2 queuedSubmissions=0 parallelism=6 stealCount=0
 activeThreads=6 runningThreads=2 poolSize=6 queuedTasks=2 queuedSubmissions=0 parallelism=6 stealCount=0
 activeThreads=6 runningThreads=1 poolSize=6 queuedTasks=2 queuedSubmissions=0 parallelism=6 stealCount=0
 activeThreads=6 runningThreads=2 poolSize=6 queuedTasks=1 queuedSubmissions=0 parallelism=6 stealCount=0
 activeThreads=4 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=0
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
 activeThreads=1 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3

编辑:使用固定大小的ThreadPoolExecutor为taskSupport,同样的问题再次,只有2个线程出6例执行最后几个任务。

明确提交任务的ThreadPoolExecutor产生正确的结果。 直到所有任务完了所有的线程是活跃的。 然而,要弄清楚为什么在taskSupport设置不起作用。 明确任务提交到餐桌的加入池或线程池的Executor完成了11秒,而tasks.par有一张需要17秒。 也许由于shouldSplitFurther定义IterableSplitter

文章来源: Using ThreadPoolTaskSupport as tasksupport for parallel collections in scala