How to retrieve partial result in multiple Future

2019-08-30 07:15发布

问题:

I am using FUTURE in scala on play framework. But I have difficulty to get part of the final result in case of timeout when merging multiple futures. Here is what my code does. It has two future to query two providers respectively. And then use a for/yield statement to merge the results. And then await for the result with a time out value. It works fine when two providers reply to the query on time. However, if just one provider reply on time, I know the await will timeout but in this case I still need to retrieve the data returned from the other provider which replied on time. How do I do that?

val pool = Executors.newCachedThreadPool()
implicit val ec = ExecutionContext.fromExecutorService(pool)

var future1 = Future(QueryProvider(provider1, request1))  
var future2 = Future(QueryProvider(provider2, request2))  

val future = for {
    result1 <- future1
    result2 <- future2
 } yield (Merge(result1, result2))

 val duration = Duration(60000, MILLISECONDS)

 try{
     result = Await.result(future, duration).asInstanceOf[string]       
 }

catch{
    case _: Exception => println("time out...") 
    //Here how can I retrieve provider1's result if only provider2 timeout???***
 }

回答1:

You could use after from akka instead of blocking Await.result:

val timeout =
  akka.pattern.after(FiniteDuration(60000, MILLISECONDS), using = system.scheduler){
    Future.successful(None)
  }

val resFuture = for {
    result1 <- Future firstCompletedOf Seq(future1.map{Some(_)}, timeout)
    result2 <- Future firstCompletedOf Seq(future2.map{Some(_)}, timeout)
} yield (result1, result2)

val result = resFuture map {
  case (Some(r1), Some(r2)) => Merge(r1, r2)
  case (Some(r1), None) => PartialResult1(r1)
  case (None, Some(r2)) => PartialResult2(r2)
  case _ => EmptyResult
}

In this case resFuture will be completed in 60 seconds and you can process partial result. Also you don't need Await in Play - you could use Async.

In case you have many equivalent futures of the same type you could use it like this:

val futures: Seq[Future[Int]] = ???
val futureWithTimeout =
  futures.map{ f => Future firstCompletedOf Seq(f.map{Some(_)}, timeout) }

val result: Future[Seq[Option[Int]]] = Future.sequence(futureWithTimeout)

// In case you need to know index of completed or not completed future
val indexedResults = result.zipWithIndex

// In case you just need only completed results
val completedResults: Future[Seq[Option[Int]]] = result.map{_.flatten}

Types here are only for illustration.