有期货,我不完全从官方教程裁判理解的一个方面。 http://docs.scala-lang.org/overviews/core/futures.html
不要在斯卡拉期货已经内置在某种超时机制? 比方说,下面的例子是5千兆字节的文本文件...没有“Implicits.global”的隐含范围最终导致onFailure处到无阻塞的方式解雇或可以是界定? 而如果没有一个默认的超时一番,那不就意味着它可能既不成功也不失败将永远火?
import scala.concurrent._
import ExecutionContext.Implicits.global
val firstOccurence: Future[Int] = future {
val source = scala.io.Source.fromFile("myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
firstOccurence onSuccess {
case idx => println("The keyword first appears at position: " + idx)
}
firstOccurence onFailure {
case t => println("Could not process file: " + t.getMessage)
}
Answer 1:
你只有当你使用阻塞得到的结果超时行为Future
。 如果你想使用非阻塞回调onComplete
, onSuccess
或onFailure
,那么你就必须推出自己的超时处理。 阿卡已建成超时处理的请求/响应( ?
参与者之间)的消息,但不知道,如果你想开始使用阿卡。 FWIW,在阿卡,超时处理,他们撰写两封Futures
在一起,通过Future.firstCompletedOf
,一个代表实际的异步任务和一个表示超时。 如果超时计时器(通过HashedWheelTimer
)弹出第一个,你上了异步回调失败。
滚动自己的非常简单的例子可能会去这样的事情。 首先,用于调度超时的对象:
import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout}
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.Promise
import java.util.concurrent.TimeoutException
object TimeoutScheduler{
val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
def scheduleTimeout(promise:Promise[_], after:Duration) = {
timer.newTimeout(new TimerTask{
def run(timeout:Timeout){
promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))
}
}, after.toNanos, TimeUnit.NANOSECONDS)
}
}
那么功能采取的未来,并添加超时行为吧:
import scala.concurrent.{Future, ExecutionContext, Promise}
import scala.concurrent.duration.Duration
def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = {
val prom = Promise[T]()
val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
val combinedFut = Future.firstCompletedOf(List(fut, prom.future))
fut onComplete{case result => timeout.cancel()}
combinedFut
}
需要注意的是HashedWheelTimer
我使用这里是Netty的。
Answer 2:
我刚刚创建了TimeoutFuture
一个同事类:
TimeoutFuture
package model
import scala.concurrent._
import scala.concurrent.duration._
import play.libs.Akka
import play.api.libs.concurrent.Execution.Implicits._
object TimeoutFuture {
def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {
val prom = promise[A]
// timeout logic
Akka.system.scheduler.scheduleOnce(timeout) {
prom tryFailure new java.util.concurrent.TimeoutException
}
// business logic
Future {
prom success block
}
prom.future
}
}
用法
val future = TimeoutFuture(10 seconds) {
// do stuff here
}
future onComplete {
case Success(stuff) => // use "stuff"
case Failure(exception) => // catch exception (either TimeoutException or an exception inside the given block)
}
笔记:
- 假设玩! 框架(但它是很容易适应)
- 每一段代码在相同的运行
ExecutionContext
这可能不是理想的。
Answer 3:
所有这些问题的答案都需要额外的依赖。 我决定写使用java.util.Timer中这是运行在未来的功能,在这种情况下触发的超时的情况有效的方式版本。
博客文章在这里更多详情
使用此使用Scala的承诺,我们可以做一个未来的超时如下:
package justinhj.concurrency
import java.util.concurrent.TimeoutException
import java.util.{Timer, TimerTask}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.postfixOps
object FutureUtil {
// All Future's that use futureWithTimeout will use the same Timer object
// it is thread safe and scales to thousands of active timers
// The true parameter ensures that timeout timers are daemon threads and do not stop
// the program from shutting down
val timer: Timer = new Timer(true)
/**
* Returns the result of the provided future within the given time or a timeout exception, whichever is first
* This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a
* Thread.sleep would
* @param future Caller passes a future to execute
* @param timeout Time before we return a Timeout exception instead of future's outcome
* @return Future[T]
*/
def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = {
// Promise will be fulfilled with either the callers Future or the timer task if it times out
val p = Promise[T]
// and a Timer task to handle timing out
val timerTask = new TimerTask() {
def run() : Unit = {
p.tryFailure(new TimeoutException())
}
}
// Set the timeout to check in the future
timer.schedule(timerTask, timeout.toMillis)
future.map {
a =>
if(p.trySuccess(a)) {
timerTask.cancel()
}
}
.recover {
case e: Exception =>
if(p.tryFailure(e)) {
timerTask.cancel()
}
}
p.future
}
}
Answer 4:
玩框架包含Promise.timeout这样你就可以像下面写代码
private def get(): Future[Option[Boolean]] = {
val timeoutFuture = Promise.timeout(None, Duration("1s"))
val mayBeHaveData = Future{
// do something
Some(true)
}
// if timeout occurred then None will be result of method
Future.firstCompletedOf(List(mayBeHaveData, timeoutFuture))
}
Answer 5:
当你等待将来可以指定超时:
对于scala.concurrent.Future
,该result
的方法可以让你指定超时。
对于scala.actors.Future
, Futures.awaitAll
允许您指定超时。
我不认为这是内置在未来的执行超时。
Answer 6:
我很惊讶,这不是在斯卡拉标准。 我的版本是短暂的,并没有依赖关系
import scala.concurrent.Future
sealed class TimeoutException extends RuntimeException
object FutureTimeout {
import scala.concurrent.ExecutionContext.Implicits.global
implicit class FutureTimeoutLike[T](f: Future[T]) {
def withTimeout(ms: Long): Future[T] = Future.firstCompletedOf(List(f, Future {
Thread.sleep(ms)
throw new TimeoutException
}))
lazy val withTimeout: Future[T] = withTimeout(2000) // default 2s timeout
}
}
用法示例
import FutureTimeout._
Future { /* do smth */ } withTimeout
Answer 7:
没有人提到的akka-streams
,但。 流有一个简单的completionTimeout
方法,以及应用该上一个单源数据流就像一个未来。
但是,阿卡流也不会消除,因此实际上可以从运行结束源,即它标志着超时源。
Answer 8:
如果你想作家(承诺持有人)是谁控制了超时逻辑的一个,使用akka.pattern.after ,以下列方式:
val timeout = akka.pattern.after(10 seconds, system.scheduler)(Future.failed(new TimeoutException(s"timed out during...")))
Future.firstCompletedOf(Seq(promiseRef.future, timeout))
这样,如果你的承诺完成逻辑从来没有发生,您的来电者的未来依然会在某个时候以失败结束。
Answer 9:
Monix Task
已超时支持
import monix.execution.Scheduler.Implicits.global
import monix.eval._
import scala.concurrent.duration._
import scala.concurrent.TimeoutException
val source = Task("Hello!").delayExecution(10.seconds)
// Triggers error if the source does not complete in 3 seconds after runOnComplete
val timedOut = source.timeout(3.seconds)
timedOut.runOnComplete(r => println(r))
//=> Failure(TimeoutException)
Answer 10:
我使用这个版本(基于上面播放例子),它采用阿卡调度员:
object TimeoutFuture {
def apply[A](system: ActorSystem, timeout: FiniteDuration)(block: => A): Future[A] = {
implicit val executionContext = system.dispatcher
val prom = Promise[A]
// timeout logic
system.scheduler.scheduleOnce(timeout) {
prom tryFailure new java.util.concurrent.TimeoutException
}
// business logic
Future {
try {
prom success block
} catch {
case t: Throwable => prom tryFailure t
}
}
prom.future
}
}
Answer 11:
对未来IMO指定超时的最简单方法是Scala的使用内置机制scala.concurrent.Await.ready
这将抛出一个TimeoutException
,如果未来的时间超过指定的超时时间。 否则,它将返回未来本身。 下面是一个简单的人为的例子
import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.duration._
val f1: Future[Int] = Future {
Thread.sleep(1100)
5
}
val fDoesntTimeout: Future[Int] = Await.ready(f1, 2000 milliseconds)
val f: Future[Int] = Future {
Thread.sleep(1100)
5
}
val fTimesOut: Future[Int] = Await.ready(f, 100 milliseconds)
Answer 12:
这个版本的作品,而无需使用超时
import scala.concurrent._
import scala.concurrent.duration.FiniteDuration
object TimeoutFuture {
def apply[A](
timeout: FiniteDuration
)(block: => A)(implicit executor: ExecutionContext): Future[A] =
try {
Future { Await.result(Future { block }, timeout) }
} catch {
case _: TimeoutException => Future.failed(new TimeoutException(s"Timed out after ${timeout.toString}"))
}
}
文章来源: Scala Futures - built in timeout?