阿卡演员的选择没有竞争条件(akka Actor selection without race co

2019-10-21 08:45发布

我有一个期货游泳池和未来的每个使用相同的阿卡演员系统工作 - 在系统中的一些演员应该是全球性的,有些是只用在一个未来。

  val longFutures = for (i <- 0 until 2 ) yield Future {
    val p:Page = PhantomExecutor(isDebug=true)
    Await.result( p.open("http://www.stackoverflow.com/") ,timeout = 10.seconds)
  }

PhantomExecutor tryes使用使用一个共享的全球参与者(简单的增量计数器) system.actorSelection

  def selectActor[T <: Actor  : ClassTag](system:ActorSystem,name:String) = {
    val timeout = Timeout(0.1 seconds)
    val myFutureStuff = system.actorSelection("akka://"+system.name+"/user/"+name)
    val aid:ActorIdentity = Await.result(myFutureStuff.ask(Identify(1))(timeout).mapTo[ActorIdentity],
      0.1 seconds)

    aid.ref match {
      case Some(cacher) =>
        cacher
      case None =>
        system.actorOf(Props[T],name)
    }
  }

但是,在并发环境下这种方法并没有因为比赛条件下工作。

我知道这个问题只有一个解决方案 - 分裂期货之前创建的全球参与者。 但是,这意味着我不能从封装顶部图书馆用户隐藏的工作很多。

Answer 1:

你说得对,确保全球参与者首先被初始化是正确的做法。 你不能配合他们同伴的对象,并从那里引用他们,你知道他们会永远只能一次初始化? 如果你真的不能用这样的方法去,那么你可以尝试这样的事情来查找或创建的演员。 它类似于你的代码,但它包含的逻辑通过查找回去/创建逻辑(递归)如果比赛条件被击中(只到次的最大数量):

  def findOrCreateActor[T <: Actor : ClassTag](system:ActorSystem, name:String, maxAttempts:Int = 5):ActorRef = {
    import system.dispatcher
    val timeout = 0.1 seconds

    def doFindOrCreate(depth:Int = 0):ActorRef = {
      if (depth >= maxAttempts) 
        throw new RuntimeException(s"Can not create actor with name $name and reached max attempts of $maxAttempts") 

      val selection = system.actorSelection(s"/user/$name")
      val fut = selection.resolveOne(timeout).map(Some(_)).recover{
        case ex:ActorNotFound => None
      }
      val refOpt = Await.result(fut, timeout)

      refOpt match {
        case Some(ref) => ref
        case None => util.Try(system.actorOf(Props[T],name)).getOrElse(doFindOrCreate(depth + 1))
      }
    }

    doFindOrCreate()
  }

现在创建的演员时的重试逻辑将火任何异常,所以你可能想进一步指出(可能是通过其他recover组合子),只当它到达一个递归InvalidActorNameException ,但你的想法。



Answer 2:

您可能要考虑创建一个管理者的演员,将采取有关创建“计数器”演员的照顾。 这样,您将确保反演员创作的请求序列化。

object CounterManagerActor {
  case class SelectActorRequest(name : String)
  case class SelectActorResponse(name : String, actorRef : ActorRef)
} 

class CounterManagerActor extends Actor {
  def receive = {
    case SelectActorRequest(name) => {
      sender() ! SelectActorResponse(name, selectActor(name))
    } 
  }

  private def selectActor(name : String) = {
    // a slightly modified version of the original selectActor() method
    ???
  }
} 


文章来源: akka Actor selection without race condition