failure in Scala future's for comprehension

2019-03-14 05:08发布

问题:

I have three sequential Futures and use in the for comprehension like this

val comF = for {
  f1 <- future1
  f2 <- future2
  f3 <- future3
} yield {
  // something
}

comF onSuccess { }
comF onFailure { 
  // ----------------      Here is the problem  --------------------------------
  //
  // How do I know which future failed(throw exception), when the callback comes here ?
  // Thanks for the help! Different futures using different exceptions can solve it.
}

Now I have a future list like List[Future[T]], and first I transfer it to Future[List[T]] using this method (Why does this list-of-futures to future-of-list transformation compile and work?). then I get the future

val fList: Future[List[T]]
fList on Failure {
  // 
  // How do I know which is Fail now >??
}

回答1:

Consider the code:

def func = {
  try {
    val x = maybeThrows
    val y = maybeThrowsToo
    val z = maybeThrowsAsWell
    result(x, y, x)
  } catch (RuntimeException e) {
    // How do I know which maybeThrows failed?
  }
}

The Future case works essentially in the same way.


Even grouping computations in List doesn't help:

def func = {
  try {
    val x = maybeThrows
    val y = maybeThrowsToo
    val z = maybeThrowsAsWell
    val list = List(x, y, z)
    result(list)
  } catch (RuntimeException e) {
    // How do I know which maybeThrows failed?
  }
}

Spoiler: you have to track explicityl which computation failed. It would result in some boilerplate if done with try/catch. But luckily with Future (and Try) the boilerplate isn't that bad:

class TaggedException(val idx, exc: Exception)

def tagFailedWithIndex[T](idx: Int, f: Future[T]): Future[T] = 
  future recoverWith { case exc => Future.failed(new TaggedException(idx, exc)) }

val comF = for {
  f1 <- tagFailedWithIndex(0, future1)
  f2 <- tagFailedWithIndex(1, future2)
  f3 <- tagFailedWithIndex(2, future3)
} yield something(f1, f2, f3)

comF onFailure { 
  case exc: TaggedException => "%d computation failed".format(exc.idx)
}

Spoiler you have to track which computation failed explicitly. It would result in a lot of boilerplate if done with try/catch. But luckily there is Try, and Future behaves even more the same:

class TaggedException(val idx, exc: Exception)

def tagFailedWithIndex[T](idx: Int, f: Future[T]): Future[T] = 
  future recoverWith { case exc => Future.failed(new TaggedException(idx, exc)) }

val comF = for {
  f1 <- tagFailedWithIndex(0, future1)
  f2 <- tagFailedWithIndex(1, future2)
  f3 <- tagFailedWithIndex(2, future3)
} yield something(f1, f2, f3)

comF onFailure { 
  case exc: TaggedException => "%d computation failed".format(exc.idx)
}


回答2:

The problem: flatMap combines the Futures into one single Future

You are using flatMap, so the futures are nested into one single future.

Your code is

import scala.concurrent.Future

val future1, future2, future3 = Future[Any]()

val comF = for {
  f1 <- future1
  f2 <- future2
  f3 <- future3
} yield {
  // something
}

comF onSuccess { ??? }
comF onFailure { ??? }

When I apply the de-sugaring rules to your for-comprehension, I get

val comF = (future1).flatMap { case f1 => 
  (future2).flatMap { case f2 => 
    (future3).map { case f3 => {
        // something
      }
    }
  }
}

You can clearly see the use of flatMap here. In principle flatMap does two things: It applies a function to the result of the first Future. This must be a function that maps the result of the first Future to another Future, i.e. a Future nested in the first Future (this is the map part). Then it 'unnestes' the two Futures and merges them in one single Future (the flat part). On this point, the two Futures don't exist any more (from a conceptual point of view; technically they are still there). Instead there is only one 'merged' Future.

The two calls to flatMap create a new Future out of the original three. This is the reason why you cannot find out, which of the three original Futures raises the exception: It's none of them. Instead the newly created Future runs and raises the exception.

Solution: Track your progress separately

If you want to know, which steps of your calculation ran before one raised an exception, you must track the progress separately, e.g. through an additional parameter in the Exception. Alternatively you can remove flatMap and run the individual Futures one after the other.

An example on how to track your progress:

First we create a new exception class, that contains the real exception together with some information, where the exception came from

class ExceptionWithOrigin[T](val origin : T, nested : Throwable) extends Exception(nested)

object ExceptionWithOrigin {
  def wrapFuture[T,U](origin : T, f : Future[U]) : Future[U] = {
    f.transform(Predef.identity, new ExceptionWithOrigin[T](origin,_))
  }

  def wrapFuture[U](f : Future[U]) = wrapFuture(f,f)
}

For the Futures we have no special requirements.

val future1,future2,future3 = Future[Any]()

We then wrap up the given Futures using the helper method from the companion object of our newly created exception class.

import ExceptionWithOrigin._

val comF = for {
  result1 <- wrapFuture(future1)
  result2 <- wrapFuture(future2)
  result3 <- wrapFuture(future3)
} yield {
  // something
}

When you catch some exception ex, you can now just use ex.origin to find out where it came from. Of course, the origin is not quite correct. The original Futures future1, future2 and future3 are not really executed. Instead a newly created Future runs, created by the flatMap. But nevertheless the origin still works.

A hint about better naming

Btw, you should rename f1, f2 and f3 to result1, result2 and result3. They don't represent a Future, but the result of the calculation of each Future (the value each Future returns).



回答3:

The venerable @Rit (Brendan McAdams) finally convinced me to try Scalaz with disjunctions in his talk A Skeptic's Look at scalaz' "Gateway Drugs". Your code would be wrapped in a disjunction and look like this:

val comF = for {
  f1 <- future1 \/> "Future 1 failed"
  f2 <- future2 \/> "Future 2 failed"
  f3 <- future3 \/> "Future 3 failed"
} yield {
  // something
}

Related quesiton



回答4:

To be able to tell between individual failures, you have to handle failures individually. Something like this, perhaps:

Future[List[T]] = Future.sequence(
    futures.zipWithIndex.map { case (f, idx) => 
        f.onFailure { case ex => println(s"Future $idx failed with $ex") }
        f
    } 
)


标签: scala future