scala Future processing depth-first not breadth-fi

2019-06-14 14:55发布

问题:

I have a large computation roughly based on the following pattern :

def f1(i:Int):Int = ???
def f2(i:Int):Int = ???

def processA(l: List[Int]) = 
  l.map(i => Future(f1(i)))

def processB(l: List[Int]) = {
  val p = processA(l)
  p.map(fut => fut.map(f2))
}

def main() = {
  val items = List( /* 1k to 10k items here */ )
  val results = processB(items)
  results.map(_.onComplete ( ... ))
}

The problem I encounter, if my understanding is correct, is that the processing is breadth-first. ProcessA starts thousands of Futures, and processB will then enqueue thousands of new Futures that will be processed after those of processA are finished. The onComplete callback will start to be fired very late...

I would like to turn this depth-first : few Futures of processA starts, and then, processB continues from there instead of switching to something else in queues.

Can it be done in vanilla scala ? Should I turn to some lib with an alternative to Futures() and ThreadPools ?

EDIT: a bit more detail. Rewriting into f1 andThen f2, as it has been suggested in answers, is not practicable at this time. Actually, processA and B are doing a bunch of other things (incl. side effect). And the fact that processB relies on ProcessA is private. It would break SoC if it's exposed.

EDIT 2: I think I'm going to relax a bit the "vanilla" constraint. Someone suggested Akka streams that would help. I'm currently having a look at scalaz.Task: an opinion anyone ?

回答1:

I wasn't 100% sure I understood the question, since processB (f2) runs on top of the results of processA (f1) you cannot call f2 on values which have not been computed by f1 yet, so my answer is based on the assumption that:

  • You want to limit work-in-progress
  • You want to execute f2 immediately after f1

So here's one solution to that:

import scala.concurrent._
def process(noAtATime: Int, l: List[Int])(transform: Int => Int)(implicit ec: ExecutionContext): Future[List[Int]] = {
  // define an inner async "loop" to process one chunk of numbers at a time
  def batched(i: Future[Iterator[List[Int]]], result: List[List[Int]]): Future[List[Int]] =
    i flatMap { it =>
      // if there are more chunks to process
      // we process all numbers in the chunk as parallel as possible,
      // then combine the results into a List again, then when all are done,
      // we recurse via flatMap+batched with the iterator
      // when we have no chunks left, then we un-chunk the results
      // reassemble it into the original order and return the result
      if(it.hasNext) Future.traverse(it.next)(n => Future(transform(n))).flatMap(re => batched(i, re :: result))
      else Future.successful(result.reverse.flatten) // Optimize this as needed
    }
  // Start the async "loop" over chunks of input and with an empty result
  batched(Future.successful(l.grouped(noAtATime)), List.empty)
}


scala> def f1(i: Int) = i * 2 // Dummy impl to prove it works
f1: (i: Int)Int

scala> def f2(i: Int) = i + 1 // Dummy impl to prove it works
f2: (i: Int)Int

scala> process(noAtATime = 100, (1 to 10000).toList)(n => f2(f1(n)))(ExecutionContext.global)
res0: scala.concurrent.Future[List[Int]] = Future(<not completed>)

scala> res0.foreach(println)(ExecutionContext.global)

scala> List(3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41, 43, 45, 47, 49, 51, 53, 55, 57, 59, 61, 63, 65, 67, 69, 71, 73, 75, 77, 79, 81, 83, 85, 87, 89, 91, 93, 95, 97, 99, 101, 103, 105, 107, 109, 111, 113, 115, 117, 119 …

If you are willing and able to use a library which is better suited for the problem at hand, have a look at this reply



回答2:

Your problem is best expressed as a stream. Jobs go into the stream, and are processed, backpressure is used to ensure that only a limited amount of work is done at a time. In Akka streams, it looks like this:

Source(items)
  .mapAsync(4)(f1)
  .mapAsync(4)(f2)
  .<whatever you want to do with the result>

The parallelism will need to be carefully selected to match thread pool sizing, but what this will ensure is that the average rate of times going through f1 will equal the average rate of times going through f2.



回答3:

What is wrong with this?

listOfInts.par.map(f1 andThen f2)


回答4:

It is not clear from the question if, from a asynchronous computation point of view, there is a requirement that f1 and f2 are separated into processA and processB - results from f1 are always and only passed to f2, which can be done simpler as part of a single computation "f1 andThen f2", and in a single Future.

If that is the case, then the problem is reduced to "how can a asynchronous computation be run on a potentially large input, limiting the in-flight, spawned Futures":

import scala.concurrent.Future
import java.util.concurrent.Semaphore
import scala.concurrent.ExecutionContext.Implicits.global

val f: (Int) => Int = i => f2(f1(i))

def process(concurrency: Int, input: List[Int], f: Int => Int): Future[List[Int]] = {
  val semaphore = new Semaphore(concurrency)
  Future.traverse(input) { i =>
    semaphore.acquire()
    Future(f(i)).andThen { case _ => semaphore.release() }
  }
}


回答5:

Something like this, perhaps:

items.foreach { it => processB(Seq(it)).onComplete(...) }

Of course, this does not help very much if your f2 is significantly heavier than f1. You'll need some more explicit coordination in that case:

val batchSize = 10
val sem = new Semaphore(batchSize) 
items.foreach { it => 
   sem.acquire
   processB(Seq(It))
    .andThen { case _ => sem.release }
    .onComplete { ... }
}