I've built a distributed streaming machine learning model using akka's actor model. Training the model happens asynchronously by sending a Training Instance (Data to train on) to an Actor. Training on this data takes up computation time and changes the state of the actor.
Currently I'm using historical data to train the model. I want to run a bunch of differently configured models that are trained on the same data and see how different ensemble metrics vary. Essentially here is a much less complicated simulation of what I have going on with the Thread.sleep(1) and the data Array representing computation time and state.
implicit val as = ActorSystem()
case object Report
case class Model(dataSize: Int) {
val modelActor: ActorRef = actor(new Act {
val data = Array.fill(dataSize)(0)
become {
case trainingData: Int => {
// Screw with the state of the actor and pretend that it takes time
Thread.sleep(1)
data(Math.abs(Random.nextInt % dataSize)) == trainingData
}
case Report => {
println(s"Finished $dataSize")
context.stop(self)
}
}
})
def train(trainingInstance: Int) = modelActor ! trainingInstance
def report: Unit = modelActor ! Report
}
val trainingData = Array.fill(5000)(Random.nextInt)
val dataSizeParams = (1 to 500)
Next I use a for loop to vary the parameters (represent by dataSizeParams array)
for {
param <- dataSizeParams
} {
// make model with params
val model = Model(param)
for {
trainingInstance <- trainingData
} {
model.train(trainingInstance)
}
model.report
}
The for loop is definitely the WRONG WAY to do what I'm trying to do. It kicks off all the different models in parallel. It works well when the dataSizeParams are in the 1 to 500 range, but if I bump that up to something high my Models EACH start to take up noticeable chunks of memory. What I came up with is the code below. Essentially I have a Model master who can control the number of models running at once based on the number of Run messages he has received. Each Model now contains a reference to this master actor and sends him a message when he is done processing:
// Alternative that doesn't use a for loop and instead controls concurrency through what I'm calling a master actor
case object ImDone
case object Run
case class Model(dataSize: Int, master: ActorRef) {
val modelActor: ActorRef = actor(new Act {
val data = Array.fill(dataSize)(0)
become {
case trainingData: Int => {
// Screw with the state of the actor and pretend that it takes time
Tread.sleep(1)
data(Math.abs(Random.nextInt % dataSize)) == trainingData
}
case Report => {
println(s"Finished $dataSize")
master ! ImDone
context.stop(self)
}
}
})
def train(trainingInstance: Int) = modelActor ! trainingInstance
def report: Unit = modelActor ! Report
}
val master: ActorRef = actor(new Act {
var paramRuns = dataSizeParams.toIterator
become {
case Run => {
if (paramRuns.hasNext) {
val model = Model(paramRuns.next(), self)
for {
trainingInstance <- trainingData
} {
model.train(trainingInstance)
}
model.report
} else {
println("No more to run")
context.stop(self)
}
}
case ImDone => {
self ! Run
}
}
})
master ! Run
There isn't any problem with master code(that I can see). I have tight control over the number of Models spawned at one time, but I feel like I'm missing a much easier/clean/out-of-the box way of doing this. Also I was wondering if there were any neat ways to throttle the number of Models running at once, by say looking into the System's CPU and memory usage.