I'm curious about the best way to recursively build a chain of Akka futures which will run sequentially, if a doWork
call in a future fails, the future should be retried up to 3 times, the chain should fail if it runs out of retry attempts. Assuming all doWork
calls pass the returned future futChain
should only complete.
object Main extends App {
val futChain = recurse(2)
def recurse(param: Int, retries: Int = 3): Future[String] {
Future {
doWorkThatMayFailReturningString(param...)
} recoverWith {
case e =>
if (retries > 0) recurse(param, retries -1)
else Future.failed(e)
} flatMap {
strRes => recurse(nextParam) //how should the res from the previous fut be passed?
}
}
futChain onComplete {
case res => println(res) //should print all the strings
}
}
- How can I get the results as a collection? i.e. in this example each
String
return from the doWork
function (I need to somehow modify the recurse
func to return a Futrue[List[String]]
- Should I use
recover
or recoverWith
?
- Is it ok to call
flatMap
to chain these calls
- Should I make considerations about tail recursion & stack overflows?
- Would I be better to recursively build a list of futures and reduce them?
You can implement a retryable Future
like this:
def retry[T](f: => Future[T])(n: Int)(implicit e: ExecutionContext): Future[T] = {
n match {
case i if (i > 1) => f.recoverWith{ case t: Throwable => retry(f)(n - 1)}
case _ => f
}
}
This isn't optimized for tail recursion, but if you only intend on retrying a few times, you won't get a stack overflow (and I imagine if it's failed the first few, it's going to keep failing, anyway).
Then I would do the chaining separately. If you have a finite number of functions to chain together, each depending on the previous (and for some reason you want to aggregate the results) you can use for
comprehensions (syntactic sugar for flatMap
):
for {
firstResult <- retry(Future(doWork(param)))(3)
secondResult <- retry(Future(doWork(firstResult)))(3)
thirdResult <- retry(Future(doWork(secondResult)))(3)
} yield List(firstResult, secondResult, thirdResult)
For an arbitrarily long chains, you can do them in parallel using Future.sequence
(Futures
in the Akka library):
def doWork(param: String): String = ...
val parameters: List[String] = List(...)
val results: Future[List[String]] = Future.sequence(parameters.map(doWork(_)))
This will unravel what would otherwise be List[Future[String]]
to Future[List[String]]
.
Here's one way to do a similar thing in sequence:
def sequential[A, B](seq: List[A])(f: A => Future[B])(implicit e: ExecutionContext): Future[List[B]] = {
seq.foldLeft(Future.successful(List[B]())) { case (left, next) =>
left.flatMap(list => f(next).map(_ :: list))
}
}
def doWork(param: String): String = ...
val results: Future[List[String]] = sequential(parameters)(param => Future(doWork(param)))
The implementation of these functions is very sensitive to your use case. The two above functions will return failed futures if any of the futures in the chain failed. Sometimes you'll want this, other times not. If you want to collect only the successful futures, and discard failed ones without failing the entire result, you can add an extra step to recover the failures.
Additionally, the difference between recover
and recoverWith
is the type of PartialFunction
it accepts. recover
replaces failed futures with default values, while recoverWith
does so using another Future
. In the case of my retry
, recoverWith
is more appropriate because I'm trying to recover the failed Future
with itself.