akka Actor selection without race condition

2019-07-29 01:14发布

问题:

I have a futures pool , and each future works with the same akka Actor System - some Actors in system should be global, some are used only in one future.

  val longFutures = for (i <- 0 until 2 ) yield Future {
    val p:Page = PhantomExecutor(isDebug=true)
    Await.result( p.open("http://www.stackoverflow.com/") ,timeout = 10.seconds)
  }

PhantomExecutor tryes to use one shared global actor (simple increment counter) using system.actorSelection

  def selectActor[T <: Actor  : ClassTag](system:ActorSystem,name:String) = {
    val timeout = Timeout(0.1 seconds)
    val myFutureStuff = system.actorSelection("akka://"+system.name+"/user/"+name)
    val aid:ActorIdentity = Await.result(myFutureStuff.ask(Identify(1))(timeout).mapTo[ActorIdentity],
      0.1 seconds)

    aid.ref match {
      case Some(cacher) =>
        cacher
      case None =>
        system.actorOf(Props[T],name)
    }
  }

But in concurrent environment this approach does not work because of race condition.

I know only one solution for this problem - create global actors before splitting to futures. But this means that I can't encapsulate alot of hidden work from top library user.

回答1:

You're right in that making sure the global actors are initialized first is the right approach. Can't you tie them to a companion object and reference them from there so you know they will only ever be initialized one time? If you really can't go with such an approach then you could try something like this to lookup or create the actor. It is similar to your code but it include logic to go back through the lookup/create logic (recursively) if the race condition is hit (only up to a max number of times):

  def findOrCreateActor[T <: Actor : ClassTag](system:ActorSystem, name:String, maxAttempts:Int = 5):ActorRef = {
    import system.dispatcher
    val timeout = 0.1 seconds

    def doFindOrCreate(depth:Int = 0):ActorRef = {
      if (depth >= maxAttempts) 
        throw new RuntimeException(s"Can not create actor with name $name and reached max attempts of $maxAttempts") 

      val selection = system.actorSelection(s"/user/$name")
      val fut = selection.resolveOne(timeout).map(Some(_)).recover{
        case ex:ActorNotFound => None
      }
      val refOpt = Await.result(fut, timeout)

      refOpt match {
        case Some(ref) => ref
        case None => util.Try(system.actorOf(Props[T],name)).getOrElse(doFindOrCreate(depth + 1))
      }
    }

    doFindOrCreate()
  }

Now the retry logic would fire for any exception when creating the actor, so you might want to further specify that (probably via another recover combinator) to only recurse when it gets an InvalidActorNameException, but you get the idea.



回答2:

You may want to consider creating a manager actor that would take care about creating "counter" actors. This way you would ensure that counter actor creation requests are serialized.

object CounterManagerActor {
  case class SelectActorRequest(name : String)
  case class SelectActorResponse(name : String, actorRef : ActorRef)
} 

class CounterManagerActor extends Actor {
  def receive = {
    case SelectActorRequest(name) => {
      sender() ! SelectActorResponse(name, selectActor(name))
    } 
  }

  private def selectActor(name : String) = {
    // a slightly modified version of the original selectActor() method
    ???
  }
}