如何做好失败的,尽管未来的执行顺序?(How to carry on executing Futur

2019-08-19 03:38发布

所述traverse从方法Future对象停止在第一次失败。 我想这个方法,对错误的发生与序列的其余部分进行的宽容/宽容的版本。

目前我们已经添加了以下方法给我们的utils的:

def traverseFilteringErrors[A, B <: AnyRef]
                           (seq: Seq[A])
                           (f: A => Future[B]): Future[Seq[B]] = {
  val sentinelValue = null.asInstanceOf[B]
  val allResults = Future.traverse(seq) { x =>
    f(x) recover { case _ => sentinelValue }
  }
  val successfulResults = allResults map { result =>
    result.filterNot(_ == sentinelValue)
  }
  successfulResults
}

有一个更好的方法吗?

Answer 1:

(一般来说)一个真正有用的东西将能够推动未来的错误到一个合适的值。 或者换句话说,变换一个Future[T]Future[Try[T]]在成功的返回值变为Success[T]而失败的情况下成为一个Failure[T] 下面是我们如何实现它:

// Can also be done more concisely (but less efficiently) as:
// f.map(Success(_)).recover{ case t: Throwable => Failure( t ) }
// NOTE: you might also want to move this into an enrichment class
def mapValue[T]( f: Future[T] ): Future[Try[T]] = {
  val prom = Promise[Try[T]]()
  f onComplete prom.success
  prom.future
}

现在,如果你做到以下几点:

Future.traverse(seq)( f andThen mapValue )

你会得到一个成功的Future[Seq[Try[A]]]其最终的价值包含了Success的每一个成功的未来实例,并且Failure为每个实例失败的未来。 如果需要的话,你就可以使用collect这个序列砸Failure情况下,只保留SUCESSFUL值。

换句话说,你可以按照如下重写你的helper方法:

def traverseFilteringErrors[A, B](seq: Seq[A])(f: A => Future[B]): Future[Seq[B]] = {
  Future.traverse( seq )( f andThen mapValue ) map ( _ collect{ case Success( x ) => x } )
}


文章来源: How to carry on executing Future sequence despite failure?