how to do sequential execution of Futures in scala

2020-02-02 07:39发布

问题:

I have this scenario where I need to use an iterator, for each of the item a function f(item) is called and returns a Future[Unit].

However, I need to make it that each f(item) call is executed sequentially, they can not run in parallel.

for(item <- it)
  f(item)

won't work becuase this starts all the calls in parallel.

How do I do it so they follow in sequence?

回答1:

If you don't mind a very localised var, you can serialise the asynchronous processing (each f(item)) as follows (flatMap does the serialization):

val fSerialized = {
  var fAccum = Future{()}
  for(item <- it) {
    println(s"Processing ${item}")
    fAccum = fAccum flatMap { _ => f(item) }
  }
  fAccum
}

fSerialized.onComplete{case resTry => println("All Done.")}

In general, avoid Await operations - they block (kind of defeats the point of async, consumes resources and for sloppy designs, can deadlock)


Cool Trick 1:

You can chain together Futures via that usual suspect, flatmap - it serializes asynchronous operations. Is there anything it can't do? ;-)

def f1 = Future { // some background running logic here...}
def f2 = Future { // other background running logic here...}

val fSerialized: Future[Unit] = f1 flatMap(res1 => f2)  

fSerialized.onComplete{case resTry => println("Both Done: Success=" + resTry.isSuccess)}

None of the above blocks - the main thread runs straight through in a few dozen nanoseconds. Futures are used in all cases to execute parallel threads and keep track of asynchronous state/results and to chain logic.

fSerialized represents a composite of two different asynchronous operations chained together. As soon as the val is evaluated, it immediately starts f1 (running asynchonously). f1 runs like any Future - when it eventually finishes, it calls it's onComplete callback block. Here's the cool bit - flatMap installs it's argument as the f1 onComplete callback block - so f2 is initiated as soon as f1 completes, with no blocking, polling or wasteful resource usage. When f2 is complete, then fSerialized is complete - so it runs the fSerialized.onComplete callback block - printing "Both Done".

Not only that, but you can chain flatmaps as much as you like with neat non-spaghetti code

 f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) ...

If you were to do that via Future.onComplete, you would have to embed the successive operations as nested onComplete layers:

f1.onComplete{case res1Try => 
  f2
  f2.onComplete{case res2Try =>
    f3
    f3.onComplete{case res3Try =>
      f4
      f4.onComplete{ ...
      }
    }
  }
}

Not as nice.

Test to prove:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._

def f(item: Int): Future[Unit] = Future{
  print("Waiting " + item + " seconds ...")
  Console.flush
  blocking{Thread.sleep((item seconds).toMillis)}
  println("Done")
}

val fSerial = f(4) flatMap(res1 => f(16)) flatMap(res2 => f(2)) flatMap(res3 => f(8))

fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}

Cool Trick 2:

for-comprehensions like this:

for {a <- aExpr; b <- bExpr; c <- cExpr; d <- dExpr} yield eExpr

are nothing but syntactic-sugar for this:

aExpr.flatMap{a => bExpr.flatMap{b => cExpr.flatMap{c => dExpr.map{d => eExpr} } } }

that's a chain of flatMaps, followed by a final map.

That means that

f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) map(res4 => "Did It!")

is identical to

for {res1 <- f1; res2 <- f2; res3 <- f3; res4 <- f4} yield "Did It!"

Test to Prove (following on from previous test):

val fSerial = for {res1 <- f(4); res2 <- f(16); res3 <- f(2); res4 <- f(8)} yield "Did It!"
fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}

Not-So-Cool Trick 3:

Unfortunately you can't mix iterators & futures in the same for-comprehension. Compile error:

val fSerial = {for {nextItem <- itemIterable; nextRes <- f(nextItem)} yield "Did It"}.last

And nesting fors creates a challenge. The following doesn't serialize, but runs async blocks in parallel (nested comprehensions don't chain subsequent Futures with flatMap/Map, but instead chains as Iterable.flatMap{item => f(item)} - not the same!)

val fSerial = {for {nextItem <- itemIterable} yield
                 for {nextRes <- f(nextItem)} yield "Did It"}.last

Also using foldLeft/foldRight plus flatMap doesn't work as you'd expect - seems a bug/limitation; all async blocks are processed in parallel (so Iterator.foldLeft/Right is not sociable with Future.flatMap):

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._

def f(item: Int): Future[Unit] = Future{
  print("Waiting " + item + " seconds ...")
  Console.flush
  blocking{Thread.sleep((item seconds).toMillis)}
  println("Done")
}

val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)
val empty = Future[Unit]{()}
def serialize(f1: Future[Unit], f2: Future[Unit]) = f1 flatMap(res1 => f2)

//val fSerialized = itemIterable.iterator.foldLeft(empty){(fAccum, item) => serialize(fAccum, f(item))}
val fSerialized = itemIterable.iterator.foldRight(empty){(item, fAccum) => serialize(fAccum, f(item))}

fSerialized.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}

But this works (var involved):

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._

def f(item: Int): Future[Unit] = Future{
  print("Waiting " + item + " seconds ...")
  Console.flush
  blocking{Thread.sleep((item seconds).toMillis)}
  println("Done")
}

val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)

var fSerial = Future{()}
for {nextItem <- itemIterable} fSerial = fSerial.flatMap(accumRes => f(nextItem)) 


回答2:

def seqFutures[T, U](items: TraversableOnce[T])(yourfunction: T => Future[U]): Future[List[U]] = {
  items.foldLeft(Future.successful[List[U]](Nil)) {
    (f, item) => f.flatMap {
      x => yourfunction(item).map(_ :: x)
    }
  } map (_.reverse)
}

If you are running sequentially because resource constraints prevent running more than one Future at a time, it may be easier to create and use a custom ExecutionContext with only a single thread.



回答3:

An other option is using Akka Streams:

val doneFuture = Source
  .fromIterator(() => it)
  .mapAsync(parallelism = 1)(f)
  .runForeach{identity}


回答4:

This code shows you how to run futures in sequence using a simple promise to accomplish it.

The code contains two sequencers one executes the work one by one, the other allow you to specify how many to run at the same time.

Exceptions are not manage to keep it simple.

import scala.concurrent.{Await, Future, Promise}
import scala.util.Try
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration

/**
  * Simple class to encapsulate work, the important element here is the future
  * you can ignore the rest
  */
case class Work(id:String, workTime:Long = 100) {
  def doWork(): Future[String] = Future {
    println(s"Starting $id")
    Thread.sleep(workTime)
    println(s"End $id")
    s"$id ready"
  }
}

/**
  * SimpleSequencer is the one by one execution, the promise is the element
  * who allow to the sequencer to work, pay attention to it.
  *
  * Exceptions are ignore, this is not production code
  */
object SimpleSequencer {
  private def sequence(works:Seq[Work], results:Seq[String], p:Promise[Seq[String]]) : Unit = {
    works match {
      case Nil => p.tryComplete(Try(results))
      case work::tail => work.doWork() map {
        result => sequence(tail, results :+ result, p)
      }
    }
  }

  def sequence(works:Seq[Work]) : Future[Seq[String]] = {
    val p = Promise[Seq[String]]()
    sequence(works, Seq.empty, p)
    p.future
  }
}

/**
  * MultiSequencer fire N works at the same time
  */
object MultiSequencer {
  private def sequence(parallel:Int, works:Seq[Work], results:Seq[String], p:Promise[Seq[String]]) : Unit = {
    works match {
      case Nil => p.tryComplete(Try(results))
      case work =>
        val parallelWorks: Seq[Future[String]] = works.take(parallel).map(_.doWork())
        Future.sequence(parallelWorks) map {
          result => sequence(parallel, works.drop(parallel), results ++ result, p)
        }
    }
  }

  def sequence(parallel:Int, works:Seq[Work]) : Future[Seq[String]] = {
    val p = Promise[Seq[String]]()
    sequence(parallel, works, Seq.empty, p)
    p.future
  }

}


object Sequencer {

  def main(args: Array[String]): Unit = {
    val works = Seq.range(1, 10).map(id => Work(s"w$id"))
    val p = Promise[Unit]()

    val f = MultiSequencer.sequence(4, works) map {
      resultFromMulti =>
        println(s"MultiSequencer Results: $resultFromMulti")
        SimpleSequencer.sequence(works) map {
          resultsFromSimple =>
            println(s"MultiSequencer Results: $resultsFromSimple")
            p.complete(Try[Unit]())
        }
    }

    Await.ready(p.future, Duration.Inf)
  }
}


回答5:

Perhaps a more elegant solution would be to use recursion like detailed below.

This can be used as an example for a long operation returning a Future:

def longOperation(strToReturn: String): Future[String] = Future {
  Thread.sleep(5000)
  strToReturn
}

The following is the recursive function that traverses through the items to processed, and processes them in sequence:

def processItems(strToReturn: Seq[String]): Unit = strToReturn match {
  case x :: xs => longOperation(x).onComplete {
    case Success(str) =>
      println("Got: " + str)
      processItems(xs)
    case Failure(_) =>
      println("Something went wrong")
      processItems(xs)
  }
  case Nil => println("Done")
}

This is done by having the function recursively calling itself with the remaining items to process once the Future has either completed or failed.

To start this activity you call the 'processItems' function with a few items to process, like so:

processItems(Seq("item1", "item2", "item3"))


回答6:

Just expanding on @wingedsubmariner's answer since the .reverse at the end was bugging me (and added import statements for completeness)

import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}

def seqFutures[T, U](xs: TraversableOnce[T])(f: T => Future[U])
                    (implicit ec: ExecutionContext): Future[List[U]] = {
  val resBase = Future.successful(mutable.ListBuffer.empty[U])
  xs
    .foldLeft(resBase) { (futureRes, x) =>
      futureRes.flatMap {
        res => f(x).map(res += _)
      }
    }
    .map(_.toList)
}

Note: ListBuffer has constant time += and .toList operations



回答7:

you can use Await.result : (code untested)

"Await: singleton object used for blocking on a future (transferring its result to the current thread)."

val result  = item map {it => Await.result(f(it), Duration.Inf) } 


标签: scala