I have a large computation roughly based on the following pattern :
def f1(i:Int):Int = ???
def f2(i:Int):Int = ???
def processA(l: List[Int]) =
l.map(i => Future(f1(i)))
def processB(l: List[Int]) = {
val p = processA(l)
p.map(fut => fut.map(f2))
}
def main() = {
val items = List( /* 1k to 10k items here */ )
val results = processB(items)
results.map(_.onComplete ( ... ))
}
The problem I encounter, if my understanding is correct, is that the processing is breadth-first. ProcessA starts thousands of Futures, and processB will then enqueue thousands of new Futures that will be processed after those of processA are finished. The onComplete callback will start to be fired very late...
I would like to turn this depth-first : few Futures of processA starts, and then, processB continues from there instead of switching to something else in queues.
Can it be done in vanilla scala ? Should I turn to some lib with an alternative to Futures() and ThreadPools ?
EDIT: a bit more detail. Rewriting into f1 andThen f2
, as it has been suggested in answers, is not practicable at this time. Actually, processA and B
are doing a bunch of other things (incl. side effect). And the fact that processB
relies on ProcessA
is private. It would break SoC if it's exposed.
EDIT 2: I think I'm going to relax a bit the "vanilla" constraint. Someone suggested Akka streams that would help. I'm currently having a look at scalaz.Task: an opinion anyone ?
I wasn't 100% sure I understood the question, since processB (f2) runs on top of the results of processA (f1) you cannot call
f2
on values which have not been computed byf1
yet, so my answer is based on the assumption that:f2
immediately afterf1
So here's one solution to that:
If you are willing and able to use a library which is better suited for the problem at hand, have a look at this reply
Something like this, perhaps:
Of course, this does not help very much if your
f2
is significantly heavier thanf1
. You'll need some more explicit coordination in that case:It is not clear from the question if, from a asynchronous computation point of view, there is a requirement that f1 and f2 are separated into processA and processB - results from f1 are always and only passed to f2, which can be done simpler as part of a single computation "f1 andThen f2", and in a single Future.
If that is the case, then the problem is reduced to "how can a asynchronous computation be run on a potentially large input, limiting the in-flight, spawned Futures":
Your problem is best expressed as a stream. Jobs go into the stream, and are processed, backpressure is used to ensure that only a limited amount of work is done at a time. In Akka streams, it looks like this:
The parallelism will need to be carefully selected to match thread pool sizing, but what this will ensure is that the average rate of times going through
f1
will equal the average rate of times going throughf2
.What is wrong with this?