I have some side-effectful function,
def f(): Future[Int] = {
val n = Random.nextInt()
println(s"Generated $n")
Future(n)
}
and I want to execute it repeatedly until predicate returns true.
def success(n: Int): Boolean = n % 2 == 0
My plan is to build Stream
of results
val s = Stream.fill(10)(f)
and then use Future.find
to get the first result that satisfies predicate.
Future.find(s)(success) map println
The problem is that Future.find
runs all the futures in parallel and I want it to execute futures sequentially one after the other until predicate returns true.
scala> Future.find(s)(success) map println
Generated -237492703
Generated -935476293
Generated -1155819556
Generated -375506595
Generated -912504491
Generated -1307379057
Generated -1522265611
Generated 1163971151
Generated -516152076
res8: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@37d28f02
Some(-1155819556)
The question is how to execute stream of futures sequentially until predicate returns true? Are there any suitable functions in standard or third-party library?
Instead of using Stream I suggest using another approach. Using The Future's filter and recoverWith recursively:
def findFirst[A](futureGen: => Future[A], predicate: A => Boolean): Future[A] = {
futureGen.filter(predicate).recoverWith { case _ => findFirst(futureGen, predicate) }
}
findFirst(f, success)
This will call the Futures one after the other until 'success' will return true.
First, let's make the futures we aren't interested in fail:
val s1 = s.map(_.filter(success))
Now you can combine two such futures and get the first successful value using fallbackTo
. And just fold the stream, starting with a known-bad future:
def firstSuccess[T](stream: Stream[Future[T]]): Future[T] =
if (stream.isEmpty)
Future.failed(new NoSuchElementException)
else
stream.head.fallbackTo(firstSuccess(stream.tail))
If I understand the question, then you will have to block the thread to proceed sequentially. You can use Await to accomplish that.
scala> def f(): Future[Int] = {
| val n = Random.nextInt()
| println(s"Generated $n")
| Future(n)
| }
f: ()scala.concurrent.Future[Int]
scala> def success(n: Int): Boolean = n % 2 == 0
success: (n: Int)Boolean
scala> val s = Stream.fill(10)(f)
Using your way, I get
scala> Future.find(s)(success) map println
Generated 551866055
Generated -561348666
Generated -1103407834
Generated -812310371
Generated -1544170923
Generated 2131361419
Generated -236722325
Generated -1473890302
Generated -82395856
Some(-561348666)
res16: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@15a2d71
I should get answer as Some(-561348666), which you can get as
scala> s.find(x => success(Await.result(x,1 seconds))).get onSuccess {case p=> println(p)}
-561348666