Future Recursion Patterns/Future Chaining of arbit

2019-05-11 01:39发布

I'm curious about the best way to recursively build a chain of Akka futures which will run sequentially, if a doWork call in a future fails, the future should be retried up to 3 times, the chain should fail if it runs out of retry attempts. Assuming all doWork calls pass the returned future futChain should only complete.

object Main extends App {
  val futChain = recurse(2)

  def recurse(param: Int, retries: Int = 3): Future[String] {
    Future {
      doWorkThatMayFailReturningString(param...)
    } recoverWith {
      case e => 
        if (retries > 0) recurse(param, retries -1)
        else  Future.failed(e)
    } flatMap {
      strRes => recurse(nextParam) //how should the res from the previous fut be passed?
    }
  }

  futChain onComplete {
    case res => println(res) //should print all the strings
  }
}
  1. How can I get the results as a collection? i.e. in this example each String return from the doWork function (I need to somehow modify the recurse func to return a Futrue[List[String]]
  2. Should I use recover or recoverWith?
  3. Is it ok to call flatMap to chain these calls
  4. Should I make considerations about tail recursion & stack overflows?
  5. Would I be better to recursively build a list of futures and reduce them?

1条回答
该账号已被封号
2楼-- · 2019-05-11 02:27

You can implement a retryable Future like this:

def retry[T](f: => Future[T])(n: Int)(implicit e: ExecutionContext): Future[T] = {
    n match {
        case i if (i > 1) => f.recoverWith{ case t: Throwable => retry(f)(n - 1)}
        case _ => f
    }       
}

This isn't optimized for tail recursion, but if you only intend on retrying a few times, you won't get a stack overflow (and I imagine if it's failed the first few, it's going to keep failing, anyway).

Then I would do the chaining separately. If you have a finite number of functions to chain together, each depending on the previous (and for some reason you want to aggregate the results) you can use for comprehensions (syntactic sugar for flatMap):

for {
    firstResult <- retry(Future(doWork(param)))(3)
    secondResult <- retry(Future(doWork(firstResult)))(3)
    thirdResult <- retry(Future(doWork(secondResult)))(3)
} yield List(firstResult, secondResult, thirdResult)

For an arbitrarily long chains, you can do them in parallel using Future.sequence (Futures in the Akka library):

def doWork(param: String): String = ...

val parameters: List[String] = List(...)

val results: Future[List[String]] = Future.sequence(parameters.map(doWork(_)))

This will unravel what would otherwise be List[Future[String]] to Future[List[String]].

Here's one way to do a similar thing in sequence:

def sequential[A, B](seq: List[A])(f: A => Future[B])(implicit e: ExecutionContext): Future[List[B]] = {
    seq.foldLeft(Future.successful(List[B]())) { case (left, next) =>
        left.flatMap(list => f(next).map(_ :: list))
    }
}

def doWork(param: String): String = ...

val results: Future[List[String]] = sequential(parameters)(param => Future(doWork(param))) 

The implementation of these functions is very sensitive to your use case. The two above functions will return failed futures if any of the futures in the chain failed. Sometimes you'll want this, other times not. If you want to collect only the successful futures, and discard failed ones without failing the entire result, you can add an extra step to recover the failures.

Additionally, the difference between recover and recoverWith is the type of PartialFunction it accepts. recover replaces failed futures with default values, while recoverWith does so using another Future. In the case of my retry, recoverWith is more appropriate because I'm trying to recover the failed Future with itself.

查看更多
登录 后发表回答