How to use custom Router in Akka 2.3?

2019-06-01 17:42发布

问题:

Update: The source code for the whole akka project is now available here, with this problem filed as an issue.

I am having trouble rewriting an example of a custom router from Akka Concurrency book (section 10.6). Here is the piece of code in question:

package zzz.akka.avionics

import akka.actor.{Props, SupervisorStrategy}
import akka.dispatch.Dispatchers
import akka.routing.{RouterConfig, RouteeProvider, Route, Destination}

class SectionSpecificAttendantRouter extends RouterConfig {
  this: FlightAttendantProvider =>

  // The RouterConfig requires us to fill out these two
  // fields. We know what the supervisorStrategy is but we're
  // only slightly aware of the Dispatcher, which will be
  // discussed in detail later
  def routerDispatcher: String = Dispatchers.DefaultDispatcherId
  def supervisorStrategy: SupervisorStrategy =
    SupervisorStrategy.defaultStrategy

  // The createRoute method is what invokes the decision
  // making code.  We instantiate the Actors we need and then
  // create the routing code
  def createRoute(routeeProvider: RouteeProvider): Route = {
    // Create 5 flight attendants
    val attendants = (1 to 5) map { n =>
      routeeProvider.context.actorOf(Props(newFlightAttendant), "Attendant-" + n)
    }

    // Register them with the provider - This is important.
    // If you forget to do this, nobody's really going to
    // tell you about it :)
    routeeProvider.registerRoutees(attendants)

    // Now the partial function that calculates the route.
    // We are going to route based on the name of the
    // incoming sender.  Of course, you would cache this or
    // do something slicker.
    {
      case (sender, message) =>
        import Passenger.SeatAssignment
        val SeatAssignment(_, row, _) = sender.path.name
        List(Destination(sender,
             attendants(math.floor(row.toInt / 11).toInt)))
    }
  }
}

My questions:

  1. Should I extend Pool, RouterConfig or CustomRouterConfig?
  2. How can I get a sender reference in order to calculate the index from the flight attendant's path?

Here is my broken beginnings:

class SpecificRoutingLogic extends RoutingLogic {
  override def select(message: Any, routees: IndexedSeq[Routee]): Routee = {
    ??? no sender here!
  }
}
class SectionSpecificAttendantRouter extends CustomRouterConfig {
  this: FlightAttendantProvider =>

  override def routerDispatcher: String = Dispatchers.DefaultDispatcherId

  //override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy

  override def createRouter(system: ActorSystem): Router = {
    // Create 5 flight attendants
    val attendants = (1 to 5) map { n =>
      system.actorOf(Props(newFlightAttendant()), "Attendant-" + n)
    }
    new Router(new SpecificRoutingLogic())
  }
}

回答1:

Documentation does not mention about when you inherit from CustomRouterConfig, Akka engine does not create routees for you. Therefore nothing routes your messages. I would do that way:

class SpecificRoutingLogic extends RoutingLogic {
  override def select(message: Any, routees: IndexedSeq[Routee]): Routee = {
    val selected = routees.take(5)
    if(selected.isEmpty) NoRoutee
    else selected
  }
}
class SectionSpecificAttendantRouter(nrOfInstances: Int) extends Pool {
  this: FlightAttendantProvider =>

  override def routerDispatcher: String = Dispatchers.DefaultDispatcherId

  override def createRouter(system: ActorSystem): Router = {
    new Router(new SpecificRoutingLogic())
  }
}

I've changed SectionSpecificAttendantRouter extends now Pool not CustomRouterConfig due to issue I've wrote above. When you have this implementation (I think thats all) you create Router you have to do somethin like this SectionSpecificAttendantRouter(5).props(Props[YourWorker]) - this code have to be invoked inside for example system.actorOf() to create concrete router. Remember, if you want to inherit directly from CustomRouterConfig you have to implement creating Routees by yourself. Take a look at class akka.routing.RoutedActorCell particulary method start(). If you use something than Pool or Group you have to do some initialization stuff by yourself.



标签: scala akka