I have this scenario where I need to use an iterator, for each of the item a function f(item) is called and returns a Future[Unit]
.
However, I need to make it that each f(item)
call is executed sequentially, they can not run in parallel.
for(item <- it)
f(item)
won't work becuase this starts all the calls in parallel.
How do I do it so they follow in sequence?
If you don't mind a very localised var
, you can serialise the asynchronous processing (each f(item)
) as follows (flatMap
does the serialization):
val fSerialized = {
var fAccum = Future{()}
for(item <- it) {
println(s"Processing ${item}")
fAccum = fAccum flatMap { _ => f(item) }
}
fAccum
}
fSerialized.onComplete{case resTry => println("All Done.")}
In general, avoid Await
operations - they block (kind of defeats the point of async, consumes resources and for sloppy designs, can deadlock)
Cool Trick 1:
You can chain together Futures
via that usual suspect, flatmap
- it serializes asynchronous operations. Is there anything it can't do? ;-)
def f1 = Future { // some background running logic here...}
def f2 = Future { // other background running logic here...}
val fSerialized: Future[Unit] = f1 flatMap(res1 => f2)
fSerialized.onComplete{case resTry => println("Both Done: Success=" + resTry.isSuccess)}
None of the above blocks - the main thread runs straight through in a few dozen nanoseconds. Futures are used in all cases to execute parallel threads and keep track of asynchronous state/results and to chain logic.
fSerialized
represents a composite of two different asynchronous operations chained together. As soon as the val is evaluated, it immediately starts f1
(running asynchonously). f1
runs like any Future
- when it eventually finishes, it calls it's onComplete
callback block. Here's the cool bit - flatMap
installs it's argument as the f1
onComplete callback block - so f2
is initiated as soon as f1
completes, with no blocking, polling or wasteful resource usage. When f2
is complete, then fSerialized
is complete - so it runs the fSerialized.onComplete
callback block - printing "Both Done".
Not only that, but you can chain flatmaps as much as you like with neat non-spaghetti code
f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) ...
If you were to do that via Future.onComplete, you would have to embed the successive operations as nested onComplete layers:
f1.onComplete{case res1Try =>
f2
f2.onComplete{case res2Try =>
f3
f3.onComplete{case res3Try =>
f4
f4.onComplete{ ...
}
}
}
}
Not as nice.
Test to prove:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._
def f(item: Int): Future[Unit] = Future{
print("Waiting " + item + " seconds ...")
Console.flush
blocking{Thread.sleep((item seconds).toMillis)}
println("Done")
}
val fSerial = f(4) flatMap(res1 => f(16)) flatMap(res2 => f(2)) flatMap(res3 => f(8))
fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
Cool Trick 2:
for-comprehensions like this:
for {a <- aExpr; b <- bExpr; c <- cExpr; d <- dExpr} yield eExpr
are nothing but syntactic-sugar for this:
aExpr.flatMap{a => bExpr.flatMap{b => cExpr.flatMap{c => dExpr.map{d => eExpr} } } }
that's a chain of flatMaps, followed by a final map.
That means that
f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) map(res4 => "Did It!")
is identical to
for {res1 <- f1; res2 <- f2; res3 <- f3; res4 <- f4} yield "Did It!"
Test to Prove (following on from previous test):
val fSerial = for {res1 <- f(4); res2 <- f(16); res3 <- f(2); res4 <- f(8)} yield "Did It!"
fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
Not-So-Cool Trick 3:
Unfortunately you can't mix iterators & futures in the same for-comprehension. Compile error:
val fSerial = {for {nextItem <- itemIterable; nextRes <- f(nextItem)} yield "Did It"}.last
And nesting fors creates a challenge. The following doesn't serialize, but runs async blocks in parallel (nested comprehensions don't chain subsequent Futures with flatMap/Map, but instead chains as Iterable.flatMap{item => f(item)} - not the same!)
val fSerial = {for {nextItem <- itemIterable} yield
for {nextRes <- f(nextItem)} yield "Did It"}.last
Also using foldLeft/foldRight plus flatMap doesn't work as you'd expect - seems a bug/limitation; all async blocks are processed in parallel (so Iterator.foldLeft/Right
is not sociable with Future.flatMap
):
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._
def f(item: Int): Future[Unit] = Future{
print("Waiting " + item + " seconds ...")
Console.flush
blocking{Thread.sleep((item seconds).toMillis)}
println("Done")
}
val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)
val empty = Future[Unit]{()}
def serialize(f1: Future[Unit], f2: Future[Unit]) = f1 flatMap(res1 => f2)
//val fSerialized = itemIterable.iterator.foldLeft(empty){(fAccum, item) => serialize(fAccum, f(item))}
val fSerialized = itemIterable.iterator.foldRight(empty){(item, fAccum) => serialize(fAccum, f(item))}
fSerialized.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
But this works (var involved):
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._
def f(item: Int): Future[Unit] = Future{
print("Waiting " + item + " seconds ...")
Console.flush
blocking{Thread.sleep((item seconds).toMillis)}
println("Done")
}
val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)
var fSerial = Future{()}
for {nextItem <- itemIterable} fSerial = fSerial.flatMap(accumRes => f(nextItem))
def seqFutures[T, U](items: TraversableOnce[T])(yourfunction: T => Future[U]): Future[List[U]] = {
items.foldLeft(Future.successful[List[U]](Nil)) {
(f, item) => f.flatMap {
x => yourfunction(item).map(_ :: x)
}
} map (_.reverse)
}
If you are running sequentially because resource constraints prevent running more than one Future
at a time, it may be easier to create and use a custom ExecutionContext
with only a single thread.
An other option is using Akka Streams:
val doneFuture = Source
.fromIterator(() => it)
.mapAsync(parallelism = 1)(f)
.runForeach{identity}
This code shows you how to run futures in sequence using a simple promise to accomplish it.
The code contains two sequencers one executes the work one by one, the other allow you to specify how many to run at the same time.
Exceptions are not manage to keep it simple.
import scala.concurrent.{Await, Future, Promise}
import scala.util.Try
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
/**
* Simple class to encapsulate work, the important element here is the future
* you can ignore the rest
*/
case class Work(id:String, workTime:Long = 100) {
def doWork(): Future[String] = Future {
println(s"Starting $id")
Thread.sleep(workTime)
println(s"End $id")
s"$id ready"
}
}
/**
* SimpleSequencer is the one by one execution, the promise is the element
* who allow to the sequencer to work, pay attention to it.
*
* Exceptions are ignore, this is not production code
*/
object SimpleSequencer {
private def sequence(works:Seq[Work], results:Seq[String], p:Promise[Seq[String]]) : Unit = {
works match {
case Nil => p.tryComplete(Try(results))
case work::tail => work.doWork() map {
result => sequence(tail, results :+ result, p)
}
}
}
def sequence(works:Seq[Work]) : Future[Seq[String]] = {
val p = Promise[Seq[String]]()
sequence(works, Seq.empty, p)
p.future
}
}
/**
* MultiSequencer fire N works at the same time
*/
object MultiSequencer {
private def sequence(parallel:Int, works:Seq[Work], results:Seq[String], p:Promise[Seq[String]]) : Unit = {
works match {
case Nil => p.tryComplete(Try(results))
case work =>
val parallelWorks: Seq[Future[String]] = works.take(parallel).map(_.doWork())
Future.sequence(parallelWorks) map {
result => sequence(parallel, works.drop(parallel), results ++ result, p)
}
}
}
def sequence(parallel:Int, works:Seq[Work]) : Future[Seq[String]] = {
val p = Promise[Seq[String]]()
sequence(parallel, works, Seq.empty, p)
p.future
}
}
object Sequencer {
def main(args: Array[String]): Unit = {
val works = Seq.range(1, 10).map(id => Work(s"w$id"))
val p = Promise[Unit]()
val f = MultiSequencer.sequence(4, works) map {
resultFromMulti =>
println(s"MultiSequencer Results: $resultFromMulti")
SimpleSequencer.sequence(works) map {
resultsFromSimple =>
println(s"MultiSequencer Results: $resultsFromSimple")
p.complete(Try[Unit]())
}
}
Await.ready(p.future, Duration.Inf)
}
}
Perhaps a more elegant solution would be to use recursion like detailed below.
This can be used as an example for a long operation returning a Future:
def longOperation(strToReturn: String): Future[String] = Future {
Thread.sleep(5000)
strToReturn
}
The following is the recursive function that traverses through the items to processed, and processes them in sequence:
def processItems(strToReturn: Seq[String]): Unit = strToReturn match {
case x :: xs => longOperation(x).onComplete {
case Success(str) =>
println("Got: " + str)
processItems(xs)
case Failure(_) =>
println("Something went wrong")
processItems(xs)
}
case Nil => println("Done")
}
This is done by having the function recursively calling itself with the remaining items to process once the Future has either completed or failed.
To start this activity you call the 'processItems' function with a few items to process, like so:
processItems(Seq("item1", "item2", "item3"))
Just expanding on @wingedsubmariner's answer since the .reverse
at the end was bugging me (and added import statements for completeness)
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
def seqFutures[T, U](xs: TraversableOnce[T])(f: T => Future[U])
(implicit ec: ExecutionContext): Future[List[U]] = {
val resBase = Future.successful(mutable.ListBuffer.empty[U])
xs
.foldLeft(resBase) { (futureRes, x) =>
futureRes.flatMap {
res => f(x).map(res += _)
}
}
.map(_.toList)
}
Note: ListBuffer
has constant time +=
and .toList
operations
you can use Await.result : (code untested)
"Await: singleton object used for blocking on a future (transferring its result to the current thread)."
val result = item map {it => Await.result(f(it), Duration.Inf) }