How to invoke a method again and again until it re

2020-04-14 08:45发布

Given a method that returns a Future like this...

def remove(id: String): Future[Option[User]] = Future {
  // removes and returns the user identified by `id`
}

... how do I invoke it again and again until it returns a Future value containing None?

EDIT

Perhaps it is worth to mention that I don't need to collect the results. I just need to invoke the method as long as it finds an user to remove. The idea would be to have a loop that stops when remove returns Future[None].

标签: scala future
3条回答
女痞
2楼-- · 2020-04-14 09:15

Someone commented earlier that there's no point.

The surprise for me was that there's no quickie for lazily consuming the futures. Future.find is like firstCompletedOf, and doesn't mean find first in traversable order.

scala> import concurrent._, ExecutionContext.Implicits._
import concurrent._
import ExecutionContext.Implicits._

scala> import java.util.concurrent.atomic._
import java.util.concurrent.atomic._

scala> val count = new AtomicInteger(10)
count: java.util.concurrent.atomic.AtomicInteger = 10

scala> def f(s: String) = Future { if (count.decrementAndGet <= 0) None else Some(s) }
f: (s: String)scala.concurrent.Future[Option[String]]

scala> def g(ss: List[String]): Future[List[String]] = f("hello") flatMap { case None => Future.successful(ss) case Some(s) => g(s :: ss) }
g: (ss: List[String])scala.concurrent.Future[List[String]]

scala> g(Nil)
res0: scala.concurrent.Future[List[String]] = scala.concurrent.impl.Promise$DefaultPromise@65a15628

scala> .value
res1: Option[scala.util.Try[List[String]]] = Some(Success(List(hello, hello, hello, hello, hello, hello, hello, hello, hello)))

Illustrating the utility of not blocking:

scala> :pa
// Entering paste mode (ctrl-D to finish)

import scala.util._
import concurrent._, ExecutionContext.Implicits._
import java.util.concurrent.atomic._

class Work {
  val count = new AtomicInteger(10)
  def f(s: String) = Future {
    if (count.decrementAndGet <= 0) None else Some(s)
  } andThen {
    case Success(Some(x)) => Console println s"Calculated $x"
    case Success(None)    => Console println "Done."
    case _                => Console println "Failed."
  }
}

// Exiting paste mode, now interpreting.

import scala.util._
import concurrent._
import ExecutionContext.Implicits._
import java.util.concurrent.atomic._
defined class Work

Showing the Stream version, which won't calculate the prefix until the consuming thread steps through the blocking Awaits:

scala> val work = new Work
work: Work = Work@1b45c0e

scala> Stream continually work.f("hello") takeWhile { x => Await.result(x, duration.Duration.Inf).nonEmpty }
Calculated hello
res0: scala.collection.immutable.Stream[scala.concurrent.Future[Option[String]]] = Stream(scala.concurrent.impl.Promise$DefaultPromise@66629f63, ?)

scala> .toList
Calculated hello
Calculated hello
Calculated hello
Calculated hello
Calculated hello
Calculated hello
Calculated hello
Calculated hello
Done.
res1: List[scala.concurrent.Future[Option[String]]] = List(scala.concurrent.impl.Promise$DefaultPromise@66629f63, scala.concurrent.impl.Promise$DefaultPromise@610db97e, scala.concurrent.impl.Promise$DefaultPromise@6f0628de, scala.concurrent.impl.Promise$DefaultPromise@3fabf088, scala.concurrent.impl.Promise$DefaultPromise@1e392345, scala.concurrent.impl.Promise$DefaultPromise@12f3afb5, scala.concurrent.impl.Promise$DefaultPromise@4ced35ed, scala.concurrent.impl.Promise$DefaultPromise@2c22a348, scala.concurrent.impl.Promise$DefaultPromise@7bd69e82)

scala> .foreach (Console println _.value.get)
Success(Some(hello))
Success(Some(hello))
[snip]

The other behavior, probably more desirable, where you get a Future that holds the result of calculating the prefix:

scala> :pa
// Entering paste mode (ctrl-D to finish)

  val work = new Work
  def g(ss: List[String]): Future[List[String]] = work.f("hello") flatMap {
    case None => Future.successful(ss)
    case Some(s) => g(s :: ss)
  }

// Exiting paste mode, now interpreting.

work: Work = Work@796d3c9f
g: (ss: List[String])scala.concurrent.Future[List[String]]

scala> g(Nil)
Calculated hello
Calculated hello
res3: scala.concurrent.Future[List[String]] = scala.concurrent.impl.Promise$DefaultPromise@99a78d7
Calculated hello
Calculated hello
Calculated hello

scala> Calculated hello
Calculated hello
Calculated hello
Calculated hello
Done.

Use the future:

scala> .value
res5: Option[scala.util.Try[List[String]]] = Some(Success(List(hello, hello, hello, hello, hello, hello, hello, hello, hello)))
查看更多
够拽才男人
3楼-- · 2020-04-14 09:36

Here it is:

import concurrent._, ExecutionContext.Implicits._
import java.util.concurrent.atomic._

val count = new AtomicInteger(10)

def f(s: String) = Future {
  if (count.decrementAndGet <= 0) None else Some(s)
}

Iterator continually {
  f("hello")
} takeWhile {
  Await.result(_, duration.Duration.Inf).nonEmpty
} foreach { _.map { _.map {
  println
}}

I hope it helps.

查看更多
Lonely孤独者°
4楼-- · 2020-04-14 09:41

Stream#continually to do the same thing endlessly, and Stream#takeWhile to halt it at a certain point. http://www.scala-lang.org/api/2.11.0/index.html#scala.collection.immutable.Stream

Stream.continually(/*remove*/).takeWhile(/*not Future[None]*/)
查看更多
登录 后发表回答