Is there sequential Future.find?

2020-03-26 06:24发布

问题:

I have some side-effectful function,

def f(): Future[Int] = {
  val n = Random.nextInt()
  println(s"Generated $n")
  Future(n)
}

and I want to execute it repeatedly until predicate returns true.

def success(n: Int): Boolean = n % 2 == 0

My plan is to build Stream of results

val s = Stream.fill(10)(f)

and then use Future.find to get the first result that satisfies predicate.

Future.find(s)(success) map println

The problem is that Future.find runs all the futures in parallel and I want it to execute futures sequentially one after the other until predicate returns true.

scala> Future.find(s)(success) map println
Generated -237492703
Generated -935476293
Generated -1155819556
Generated -375506595
Generated -912504491
Generated -1307379057
Generated -1522265611
Generated 1163971151
Generated -516152076
res8: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@37d28f02
Some(-1155819556)

The question is how to execute stream of futures sequentially until predicate returns true? Are there any suitable functions in standard or third-party library?

回答1:

Instead of using Stream I suggest using another approach. Using The Future's filter and recoverWith recursively:

def findFirst[A](futureGen: => Future[A], predicate: A => Boolean): Future[A] = {
  futureGen.filter(predicate).recoverWith { case _ => findFirst(futureGen, predicate) }
}

findFirst(f, success)

This will call the Futures one after the other until 'success' will return true.



回答2:

First, let's make the futures we aren't interested in fail:

val s1 = s.map(_.filter(success))

Now you can combine two such futures and get the first successful value using fallbackTo. And just fold the stream, starting with a known-bad future:

def firstSuccess[T](stream: Stream[Future[T]]): Future[T] = 
  if (stream.isEmpty)
    Future.failed(new NoSuchElementException)
  else
    stream.head.fallbackTo(firstSuccess(stream.tail))


回答3:

If I understand the question, then you will have to block the thread to proceed sequentially. You can use Await to accomplish that.

scala> def f(): Future[Int] = {
 |   val n = Random.nextInt()
 |   println(s"Generated $n")
 |   Future(n)
 | }
f: ()scala.concurrent.Future[Int]

scala> def success(n: Int): Boolean = n % 2 == 0
success: (n: Int)Boolean

scala> val s = Stream.fill(10)(f)

Using your way, I get

scala> Future.find(s)(success) map println
Generated 551866055
Generated -561348666
Generated -1103407834
Generated -812310371
Generated -1544170923
Generated 2131361419
Generated -236722325
Generated -1473890302
Generated -82395856
Some(-561348666)
res16: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@15a2d71

I should get answer as Some(-561348666), which you can get as

scala> s.find(x => success(Await.result(x,1 seconds))).get onSuccess {case p=> println(p)}
-561348666