我有50-500 IO密集型任务,其中的每一个可以从约3分钟运行,以20分钟的List。 名单之前计算并没有新的任务递归增加。 我想与固定池的大小运行它(比如4)。 任务不依赖于其他任务。
所以我想这符合ThreadPoolTaskSupport比ForkJoinTaskSupport用例更好。 但ThreadPoolTaskSupport已被弃用,ForkJoinTaskSupport只推荐替代。
我试着用ForkJoinTaskSupport,但似乎最后8级左右的任务留在启动时杀死线程,因此最后3-4的任务在单线程运行,增加1小时,总运行时间。
有办法解决在ForkJoinTaskSupport这种行为或我应该使用ThreadPoolTaskSupport有固定的大小,尽管它被弃用?
代码来测试这一点。
object ThreadLibTest {
def main(arr:Array[String]):Unit = {
val tasks = ListBuffer.empty[() => Unit]
for (i <- 1 to 65) {
tasks += {() => {
Thread.sleep(1000)
// println("finishing " + i + " id " + Thread.currentThread().getName)
}
}
}
val ptasks = tasks.par
val fjp = new ForkJoinPool(6)
ptasks.tasksupport = new ForkJoinTaskSupport(fjp)
ptasks.map(x => {logfjp(fjp); x.apply()})
}
def logfjp(pool: ForkJoinPool) {
println(
" activeThreads=" + pool.getActiveThreadCount() +
" runningThreads=" + pool.getRunningThreadCount() +
" poolSize=" + pool.getPoolSize() +
" queuedTasks=" + pool.getQueuedTaskCount() +
" queuedSubmissions=" + pool.getQueuedSubmissionCount() +
" parallelism=" + pool.getParallelism() +
" stealCount=" + pool.getStealCount());
}
}
Last few outputs.
activeThreads=6 runningThreads=2 poolSize=6 queuedTasks=2 queuedSubmissions=0 parallelism=6 stealCount=0
activeThreads=6 runningThreads=2 poolSize=6 queuedTasks=2 queuedSubmissions=0 parallelism=6 stealCount=0
activeThreads=6 runningThreads=2 poolSize=6 queuedTasks=2 queuedSubmissions=0 parallelism=6 stealCount=0
activeThreads=6 runningThreads=1 poolSize=6 queuedTasks=2 queuedSubmissions=0 parallelism=6 stealCount=0
activeThreads=6 runningThreads=2 poolSize=6 queuedTasks=1 queuedSubmissions=0 parallelism=6 stealCount=0
activeThreads=4 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=0
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=2 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
activeThreads=1 runningThreads=1 poolSize=6 queuedTasks=0 queuedSubmissions=0 parallelism=6 stealCount=3
编辑:使用固定大小的ThreadPoolExecutor为taskSupport,同样的问题再次,只有2个线程出6例执行最后几个任务。
明确提交任务的ThreadPoolExecutor产生正确的结果。 直到所有任务完了所有的线程是活跃的。 然而,要弄清楚为什么在taskSupport设置不起作用。 明确任务提交到餐桌的加入池或线程池的Executor完成了11秒,而tasks.par有一张需要17秒。 也许由于shouldSplitFurther
定义IterableSplitter
?