所述traverse
从方法Future
对象停止在第一次失败。 我想这个方法,对错误的发生与序列的其余部分进行的宽容/宽容的版本。
目前我们已经添加了以下方法给我们的utils的:
def traverseFilteringErrors[A, B <: AnyRef]
(seq: Seq[A])
(f: A => Future[B]): Future[Seq[B]] = {
val sentinelValue = null.asInstanceOf[B]
val allResults = Future.traverse(seq) { x =>
f(x) recover { case _ => sentinelValue }
}
val successfulResults = allResults map { result =>
result.filterNot(_ == sentinelValue)
}
successfulResults
}
有一个更好的方法吗?
(一般来说)一个真正有用的东西将能够推动未来的错误到一个合适的值。 或者换句话说,变换一个Future[T]
为Future[Try[T]]
在成功的返回值变为Success[T]
而失败的情况下成为一个Failure[T]
下面是我们如何实现它:
// Can also be done more concisely (but less efficiently) as:
// f.map(Success(_)).recover{ case t: Throwable => Failure( t ) }
// NOTE: you might also want to move this into an enrichment class
def mapValue[T]( f: Future[T] ): Future[Try[T]] = {
val prom = Promise[Try[T]]()
f onComplete prom.success
prom.future
}
现在,如果你做到以下几点:
Future.traverse(seq)( f andThen mapValue )
你会得到一个成功的Future[Seq[Try[A]]]
其最终的价值包含了Success
的每一个成功的未来实例,并且Failure
为每个实例失败的未来。 如果需要的话,你就可以使用collect
这个序列砸Failure
情况下,只保留SUCESSFUL值。
换句话说,你可以按照如下重写你的helper方法:
def traverseFilteringErrors[A, B](seq: Seq[A])(f: A => Future[B]): Future[Seq[B]] = {
Future.traverse( seq )( f andThen mapValue ) map ( _ collect{ case Success( x ) => x } )
}