假设我需要办理两个函数f: String => A
和g: A => B
每行一个大的文本文件,最终创建的列表B
。
由于文件大和f
和g
很贵,我想作出处理并发。 我可以使用“平行集合”,并完成类似io.Source.fromFile("data.txt").getLines.toList.par.map(l => g(f(l))
但它不执行读取文件, f
,和g
兼任。
什么是落实在这个例子中并发的最佳方式?
假设我需要办理两个函数f: String => A
和g: A => B
每行一个大的文本文件,最终创建的列表B
。
由于文件大和f
和g
很贵,我想作出处理并发。 我可以使用“平行集合”,并完成类似io.Source.fromFile("data.txt").getLines.toList.par.map(l => g(f(l))
但它不执行读取文件, f
,和g
兼任。
什么是落实在这个例子中并发的最佳方式?
您可以使用map
上Future
:
val futures = io.Source.fromFile(fileName).getLines.map{ s => Future{ stringToA(s) }.map{ aToB } }.toIndexedSeq
val results = futures.map{ Await.result(_, 10 seconds) }
// alternatively:
val results = Await.result(Future.sequence(futures), 10 seconds)
首先,一个重要的注意事项:不要使用.par
的List
,因为它需要复制所有数据(因为List
只能顺序读取)。 取而代之的是,使用这样的Vector
,为此.par
转换,而不复制发生。
它看起来像你想的平行度的错误的方式。 这里会发生什么:
如果你有这样的文件:
0
1
2
3
4
5
6
7
8
9
和功能f
和g
:
def f(line: String) = {
println("running f(%s)".format(line))
line.toInt
}
def g(n: Int) = {
println("running g(%d)".format(n))
n + 1
}
然后,你可以这样做:
io.Source.fromFile("data.txt").getLines.toIndexedSeq[String].par.map(l => g(f(l)))
而得到的输出:
running f(3)
running f(0)
running f(5)
running f(2)
running f(6)
running f(1)
running g(2)
running f(4)
running f(7)
running g(4)
running g(1)
running g(6)
running g(3)
running g(5)
running g(0)
running g(7)
running f(9)
running f(8)
running g(9)
running g(8)
因此,即使整个g(f(l))
的操作在同一线程上发生,你可以看到, 每一行可被并行处理。 因此,许多f
和g
操作可以同时在单独的线程发生,但f
和g
对于特定线将在顺序地发生。
这是,毕竟,你应该期望,因为实际上没有办法,它可以读取行,运行方式f
,并运行g
并行。 例如,怎么会执行g
上的输出f
如果行尚未阅读?