可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
I have to run multiple futures in parallel and the program shouldn't crash or hang.
For now I wait on futures one by one, and use fallback value if there is TimeoutException.
val future1 = // start future1
val future2 = // start future2
val future3 = // start future3
// <- at this point all 3 futures are running
// waits for maximum of timeout1 seconds
val res1 = toFallback(future1, timeout1, Map[String, Int]())
// .. timeout2 seconds
val res2 = toFallback(future2, timeout2, List[Int]())
// ... timeout3 seconds
val res3 = toFallback(future3, timeout3, Map[String, BigInt]())
def toFallback[T](f: Future[T], to: Int, default: T) = {
Try(Await.result(f, to seconds))
.recover { case to: TimeoutException => default }
}
As I can see, maximum wait time of this snippet is timeout1 + timeout2 + timeout3
My question is: how can I wait on all of those futures at once, so I can reduce wait time to max(timeout1, timeout2, timeout3)
?
EDIT: In the end I used modification of @Jatin and @senia answers:
private def composeWaitingFuture[T](fut: Future[T],
timeout: Int, default: T) =
future { Await.result(fut, timeout seconds) } recover {
case e: Exception => default
}
and later it's used as follows:
// starts futures immediately and waits for maximum of timeoutX seconds
val res1 = composeWaitingFuture(future1, timeout1, Map[String, Int]())
val res2 = composeWaitingFuture(future2, timeout2, List[Int]())
val res3 = composeWaitingFuture(future3, timeout3, Map[String, BigInt]())
// takes the maximum of max(timeout1, timeout2, timeout3) to complete
val combinedFuture =
for {
r1 <- res1
r2 <- res2
r3 <- res3
} yield (r1, r2, r3)
and later I use combinedFuture
as I see fit.
回答1:
def toFallback[T](f: Future[T], to: Int, default: T) = {
future{
try{
Await.result(f, to seconds)
}catch{
case e:TimeoutException => default
}
}
You can even make this block asynchronous and each request waits for its maximum time. If there are too many threads, probably have a single thread that keeps checking for other futures using Akka's system scheduler
. @Senia has answered below on this one.
回答2:
You could create future
that returns results of all 3 futures using flatMap
or for-comprehension:
val combinedFuture =
for {
r1 <- future1
r2 <- future2
r3 <- future3
} yield (r1, r2, r3)
val (r1, r2, r3) = Await.result(combinedFuture , Seq(timeout1, timeout2, timeout3).max)
If you are using akka
you could complete your future with default value after timeout:
implicit class FutureHelper[T](f: Future[T]) extends AnyVal{
import akka.pattern.after
def orDefault(t: Timeout, default: => T)(implicit system: ActorSystem): Future[T] = {
val delayed = after(t.duration, system.scheduler)(Future.successful(default))
Future firstCompletedOf Seq(f, delayed)
}
}
val combinedFuture =
for {
r1 <- future1.orDefault(timeout1, Map())
r2 <- future2.orDefault(timeout2, List())
r3 <- future3.orDefault(timeout3, Map())
} yield (r1, r2, r3)
val (r1, r2, r3) = Await.result(combinedFuture , allowance + Seq(timeout1, timeout2, timeout3).max)
回答3:
I would avoid using Await.result
since that uses a thread just for blocking. One option to implement timeout for futures would be this:
val timer = new Timer()
def toFallback[T](f: Future[T], timeout: Int, default: T) = {
val p = Promise[T]()
f.onComplete(result => p.tryComplete(result))
timer.schedule(new TimerTask {
def run() {
p.tryComplete(Success(default))
}
}, timeout)
p.future
}
This creates a promise which will be completed either by a future or by a the default result after the specified timeout - whichever comes first.
To run the queries concurrently you would do like so:
val future1 = // start future1
val future2 = // start future2
val future3 = // start future3
val res1 = toFallback(future1, timeout1, Map[String, Int]())
val res2 = toFallback(future2, timeout2, List[Int]())
val res3 = toFallback(future3, timeout3, Map[String, BigInt]())
val resultF = for {
r1 <- res1
r2 <- res2
r3 <- res3
} yield (r1, r2, r3)
val (r1, r2, r3) = Await.result(resultF, Duration.Inf)
println(s"$r1, $r2, $r3")
//or
resultF.onSuccess {
case (r1, r2, r3) => println(s"$r1, $r2, $r3")
}
回答4:
Here's a longer (unakka) answer that addresses what might be the use case, namely, if one of the values "times out" you want to use the default value for that result and also do something with it (such as cancel the long-running calculation or i/o or whatever).
Needless to say, the other story is to minimize blocking.
The basic idea is to sit in a loop awaiting the firstCompletedOf
the items which haven't yet completed. The timeout on the ready
is the minimum remaining timeout.
This code uses deadlines instead of durations, but using a duration as "time remaining" is easy.
import scala.language.postfixOps
import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits._
import scala.reflect._
import scala.util._
import java.lang.System.{ nanoTime => now }
import Test.time
class Test {
type WorkUnit[A] = (Promise[A], Future[A], Deadline, A)
type WorkQ[A] = Seq[WorkUnit[A]]
def await[A: ClassTag](work: Seq[(Future[A], Deadline, A)]): Seq[A] = {
// check for timeout; if using Duration instead of Deadline, decrement here
def ticktock(w: WorkUnit[A]): WorkUnit[A] = w match {
case (p, f, t, v) if !p.isCompleted && t.isOverdue => p trySuccess v ; w
case _ => w
}
def await0(work: WorkQ[A]): WorkQ[A] = {
val live = work filterNot (_._1.isCompleted)
val t0 = (live map (_._3)).min
Console println s"Next deadline in ${t0.timeLeft.toMillis}"
val f0 = Future firstCompletedOf (live map (_._2))
Try(Await ready (f0, t0.timeLeft))
val next = work map (w => ticktock(w))
if (next exists (!_._1.isCompleted)) {
await0(next)
} else {
next
}
}
val wq = work map (_ match {
case (f, t, v) =>
val p = Promise[A]
p.future onComplete (x => Console println s"Value available: $x: $time")
f onSuccess {
case a: A => p trySuccess a // doesn't match on primitive A
case x => p trySuccess x.asInstanceOf[A]
}
f onFailure { case _ => p trySuccess v }
(p, f, t, v)
})
await0(wq) map (_ match {
case (p, f, t, v) => p.future.value.get.get
})
}
}
object Test {
val start = now
def time = s"The time is ${ Duration fromNanos (now - start) toMillis }"
def main(args: Array[String]): Unit = {
// #2 times out
def calc(i: Int) = {
val t = if (args.nonEmpty && i == 2) 6 else i
Thread sleep t * 1000L
Console println s"Calculate $i: $time"
i
}
// futures to be completed by a timeout deadline
// or else use default and let other work happen
val work = List(
(future(calc(1)), 3 seconds fromNow, 10),
(future(calc(2)), 5 seconds fromNow, 20),
(future(calc(3)), 7 seconds fromNow, 30)
)
Console println new Test().await(work)
}
}
Sample run:
apm@mara:~/tmp$ skalac nextcompleted.scala ; skala nextcompleted.Test
Next deadline in 2992
Calculate 1: The time is 1009
Value available: Success(1): The time is 1012
Next deadline in 4005
Calculate 2: The time is 2019
Value available: Success(2): The time is 2020
Next deadline in 4999
Calculate 3: The time is 3020
Value available: Success(3): The time is 3020
List(1, 2, 3)
apm@mara:~/tmp$ skala nextcompleted.Test arg
Next deadline in 2992
Calculate 1: The time is 1009
Value available: Success(1): The time is 1012
Next deadline in 4005
Calculate 3: The time is 3020
Value available: Success(3): The time is 3020
Next deadline in 1998
Value available: Success(20): The time is 5020
List(1, 20, 3)
回答5:
Why not get the Future
itself to perform the exception capture and return of the default ? Then you can simply Await
on each future in turn, and you don't have to worry about the exception handling outside the future.
回答6:
This is perhaps a bit hacky, but you can simply measure elapsed time and modify timeouts accordingly. Assuming timeout1 <= timeout2 <= timeout3
:
def now = System.currentTimeMillis();
val start = now;
def remains(timeout: Long): Long
= math.max(0, timeout + start - now)
def toFallback[T](f: Future[T], to: Int, default: T) = {
Try(Await.result(f, remains(to) seconds))
.recover { case to: TimeoutException => default }
}
This way each timeout is based to the moment start = now
was called, so the overall running time is at most timeout3
. If the timeouts aren't oredered, it still works, but some tasks can be left running longer than their designated timeout.
回答7:
Use Monix Task, it is Future on steroid.
import monix.execution.Scheduler.Implicits.global
import monix.eval._
import scala.concurrent.duration._
val task1 = Task{Thread.sleep(1);"task1"}.timeoutTo(timeout1,Task.now("timeout1"))
val task2 = Task{Thread.sleep(2);"task2"}.timeoutTo(timeout2,Task.now("timeout2"))
Task.zipList(Seq(task1,task2)).runSyncUnsafe(Duration.Inf)