Controlling spawning of actors in Akka who consume

2019-05-10 19:17发布

问题:

I've built a distributed streaming machine learning model using akka's actor model. Training the model happens asynchronously by sending a Training Instance (Data to train on) to an Actor. Training on this data takes up computation time and changes the state of the actor.

Currently I'm using historical data to train the model. I want to run a bunch of differently configured models that are trained on the same data and see how different ensemble metrics vary. Essentially here is a much less complicated simulation of what I have going on with the Thread.sleep(1) and the data Array representing computation time and state.

implicit val as = ActorSystem()

case object Report

case class Model(dataSize: Int) {
  val modelActor: ActorRef = actor(new Act {
    val data = Array.fill(dataSize)(0)
    become {
      case trainingData: Int => {
        // Screw with the state of the actor and pretend that it takes time
        Thread.sleep(1)
        data(Math.abs(Random.nextInt % dataSize)) == trainingData
      }
      case Report => {
          println(s"Finished $dataSize")
          context.stop(self)
        }
      }
    })

  def train(trainingInstance: Int) = modelActor ! trainingInstance

  def report: Unit = modelActor ! Report
}

val trainingData = Array.fill(5000)(Random.nextInt)

val dataSizeParams = (1 to 500)

Next I use a for loop to vary the parameters (represent by dataSizeParams array)

for {
  param <- dataSizeParams
} {
  // make model with params
  val model = Model(param)
  for {
    trainingInstance <- trainingData
  } {
    model.train(trainingInstance)
  }
  model.report
}

The for loop is definitely the WRONG WAY to do what I'm trying to do. It kicks off all the different models in parallel. It works well when the dataSizeParams are in the 1 to 500 range, but if I bump that up to something high my Models EACH start to take up noticeable chunks of memory. What I came up with is the code below. Essentially I have a Model master who can control the number of models running at once based on the number of Run messages he has received. Each Model now contains a reference to this master actor and sends him a message when he is done processing:

// Alternative that doesn't use a for loop and instead controls concurrency through what I'm calling a master actor
case object ImDone
case object Run

case class Model(dataSize: Int, master: ActorRef) {
  val modelActor: ActorRef = actor(new Act {
    val data = Array.fill(dataSize)(0)
    become {
      case trainingData: Int => {
        // Screw with the state of the actor and pretend that it takes time
        Tread.sleep(1)
        data(Math.abs(Random.nextInt % dataSize)) == trainingData
      }
      case Report => {
          println(s"Finished $dataSize")
          master ! ImDone
          context.stop(self)
        }
      }
    })

  def train(trainingInstance: Int) = modelActor ! trainingInstance

  def report: Unit = modelActor ! Report
}

val master: ActorRef = actor(new Act {
  var paramRuns = dataSizeParams.toIterator
  become {
    case Run => {
      if (paramRuns.hasNext) {
        val model = Model(paramRuns.next(), self)
        for {
          trainingInstance <- trainingData
        } {
          model.train(trainingInstance)
        }
        model.report
      } else {
        println("No more to run")
        context.stop(self)
      }
    }
    case ImDone =>  {
      self ! Run
    }
  }
})

master ! Run

There isn't any problem with master code(that I can see). I have tight control over the number of Models spawned at one time, but I feel like I'm missing a much easier/clean/out-of-the box way of doing this. Also I was wondering if there were any neat ways to throttle the number of Models running at once, by say looking into the System's CPU and memory usage.

回答1:

You're looking for the work pulling pattern. I highly suggest this blog post by the Akka devs:

http://letitcrash.com/post/29044669086/balancing-workload-across-nodes-with-akka-2

We use a variant of this on top of Akka's clustering features to avoid rogue concurrency. By having worker actors pull work instead of having a supervisor push work, you can gracefully control the amount of work (and therefore, CPU and memory usage) by simply limiting the number of worker actors.

This has a few advantages over pure routers: it's easier to track failures (as outlined in that post) and work won't languish in a mailbox (which can potentially be lost).

Also, if you're using remoting, I recommend that you not send large amounts of data in the message. Let the worker nodes pull the data from another source themselves when triggered. We use S3.