Concurrent reading and processing file line by lin

2019-03-30 17:59发布

Suppose I need to apply two functions f: String => A and g: A => B to each line in a large text file to create eventually a list of B.

Since the file is large and f and g are expensive I would like to make the processing concurrent. I can use "parallel collections" and do something like io.Source.fromFile("data.txt").getLines.toList.par.map(l => g(f(l)) but it does not execute reading the file, f, and g concurrently.

What is the best way to implement concurrency in this example?

2条回答
走好不送
2楼-- · 2019-03-30 18:21

You can use map on Future:

val futures = io.Source.fromFile(fileName).getLines.map{ s => Future{ stringToA(s) }.map{ aToB } }.toIndexedSeq

val results = futures.map{ Await.result(_, 10 seconds) }
// alternatively:
val results = Await.result(Future.sequence(futures), 10 seconds)
查看更多
萌系小妹纸
3楼-- · 2019-03-30 18:24

First, an important note: Don't use .par on List since it requires copying all the data (since List can only be read sequentially). Instead, use something like Vector, for which the .par conversion can happen without the copying.

It seems like you're thinking of the parallelism the wrong way. Here's what would happen:

If you have a file like this:

0
1
2
3
4
5
6
7
8
9

And functions f and g:

def f(line: String) = {
  println("running f(%s)".format(line))
  line.toInt
}

def g(n: Int) = {
  println("running g(%d)".format(n))
  n + 1
}

Then you can do:

io.Source.fromFile("data.txt").getLines.toIndexedSeq[String].par.map(l => g(f(l)))

And get output:

running f(3)
running f(0)
running f(5)
running f(2)
running f(6)
running f(1)
running g(2)
running f(4)
running f(7)
running g(4)
running g(1)
running g(6)
running g(3)
running g(5)
running g(0)
running g(7)
running f(9)
running f(8)
running g(9)
running g(8)

So even though the entire g(f(l)) operation is happening on the same thread, you can see that each line may be processed in parallel. Thus, many f and g operations can be happening simultaneously on separate threads, but the f and g for a particular line will happen in sequentially.

This is, after all, the way you should expect since there's actually no way that it could read the line, run f, and run g in parallel. For example, how could it execute g on the output of f if the line hasn't yet been read?

查看更多
登录 后发表回答