I have to run multiple futures in parallel and the program shouldn't crash or hang.
For now I wait on futures one by one, and use fallback value if there is TimeoutException.
val future1 = // start future1
val future2 = // start future2
val future3 = // start future3
// <- at this point all 3 futures are running
// waits for maximum of timeout1 seconds
val res1 = toFallback(future1, timeout1, Map[String, Int]())
// .. timeout2 seconds
val res2 = toFallback(future2, timeout2, List[Int]())
// ... timeout3 seconds
val res3 = toFallback(future3, timeout3, Map[String, BigInt]())
def toFallback[T](f: Future[T], to: Int, default: T) = {
Try(Await.result(f, to seconds))
.recover { case to: TimeoutException => default }
}
As I can see, maximum wait time of this snippet is timeout1 + timeout2 + timeout3
My question is: how can I wait on all of those futures at once, so I can reduce wait time to max(timeout1, timeout2, timeout3)
?
EDIT: In the end I used modification of @Jatin and @senia answers:
private def composeWaitingFuture[T](fut: Future[T],
timeout: Int, default: T) =
future { Await.result(fut, timeout seconds) } recover {
case e: Exception => default
}
and later it's used as follows:
// starts futures immediately and waits for maximum of timeoutX seconds
val res1 = composeWaitingFuture(future1, timeout1, Map[String, Int]())
val res2 = composeWaitingFuture(future2, timeout2, List[Int]())
val res3 = composeWaitingFuture(future3, timeout3, Map[String, BigInt]())
// takes the maximum of max(timeout1, timeout2, timeout3) to complete
val combinedFuture =
for {
r1 <- res1
r2 <- res2
r3 <- res3
} yield (r1, r2, r3)
and later I use combinedFuture
as I see fit.
You can even make this block asynchronous and each request waits for its maximum time. If there are too many threads, probably have a single thread that keeps checking for other futures using Akka's
system scheduler
. @Senia has answered below on this one.Here's a longer (unakka) answer that addresses what might be the use case, namely, if one of the values "times out" you want to use the default value for that result and also do something with it (such as cancel the long-running calculation or i/o or whatever).
Needless to say, the other story is to minimize blocking.
The basic idea is to sit in a loop awaiting the
firstCompletedOf
the items which haven't yet completed. The timeout on theready
is the minimum remaining timeout.This code uses deadlines instead of durations, but using a duration as "time remaining" is easy.
Sample run:
Why not get the
Future
itself to perform the exception capture and return of the default ? Then you can simplyAwait
on each future in turn, and you don't have to worry about the exception handling outside the future.You could create
future
that returns results of all 3 futures usingflatMap
or for-comprehension:If you are using
akka
you could complete your future with default value after timeout:This is perhaps a bit hacky, but you can simply measure elapsed time and modify timeouts accordingly. Assuming
timeout1 <= timeout2 <= timeout3
:This way each timeout is based to the moment
start = now
was called, so the overall running time is at mosttimeout3
. If the timeouts aren't oredered, it still works, but some tasks can be left running longer than their designated timeout.I would avoid using
Await.result
since that uses a thread just for blocking. One option to implement timeout for futures would be this:This creates a promise which will be completed either by a future or by a the default result after the specified timeout - whichever comes first.
To run the queries concurrently you would do like so: