I want to be able to make concurrent requests to multiple data repositories and consolidate the results. I am trying to understand if my approach is at all valid or if there is a better way to approach this problem. I am definitely new to Akka / Spray / Scala and really want to get a better understanding of how to properly build these components. Any suggestions / Tips would be greatly appreciated. Trying to wrap my head around the use of actors and futures for this type of implementation.
Spray Service:
trait DemoService extends HttpService with Actor with ActorLogging {
implicit val timeout = Timeout(5 seconds) // needed for `?` below
val mongoMasterActor = context.actorOf(Props[MongoMasterActor], "redisactor")
val dbMaster = context.actorOf(Props[DbMasterActor], "dbactor")
val messageApiRouting =
path("summary" / Segment / Segment) { (dataset, timeslice) =>
onComplete(getDbResponses(dbMaster, dataset, timeslice)) {
case Success(dbMessageResponse) => complete(s"The result was $dbMessageResponse")
case Failure(ex) => complete(s"An error occurred: ${ex.getMessage}")
}
}
/** Passes the desired actor reference for a specific dataset and timeslice for summary data retrieval
*
* @param mongoActor an actor reference to the RedisActor that will handle the appropriate request routing
* @param dataset The dataset for which the summary has been requested
* @param timeslice The timeslice (Month, Week, Day, etc.) for which the summary has been requested
*/
def getSummary(mongoActor: ActorRef, dataset: String, timeslice: String): Future[DbMessageResponse] = {
log.debug(s"dataset: $dataset timeslice: $timeslice")
val dbMessage = DbMessage("summary", dataset + timeslice)
(mongoActor ? dbMessage).mapTo[DbMessageResponse]
}
def getDbResponses(dbActor: ActorRef, dataset: String, timeslice: String): Future[SummaryResponse] = {
log.debug(s"dataset: $dataset timeslice: $timeslice")
val dbMessage = DbMessage("summary", dataset + timeslice)
(dbActor ? dbMessage).mapTo[SummaryResponse]
}
def getSummaryPayload(mongoSummary: DbMessageResponse, redisSummary: DbMessageResponse): String = {
mongoSummary.response + redisSummary.response
}
}
Akka Actor / Future mock db requests:
class DbMasterActor extends Actor with ActorLogging {
private var originalSender: ActorRef = _
//TODO: Need to add routing to the config to limit instances
val summaryActor = context.actorOf(Props(new SummaryActor), "summaryactor")
def receive = {
case msg: DbMessage => {
this.originalSender = sender
msg.query match {
case "summary" => {
getDbResults().onComplete{
case Success(result) => originalSender ! result
case Failure(ex) => log.error(ex.getMessage)
}
}
}
}
//If not match log an error
case _ => log.error("Received unknown message: {} ")
}
def getDbResults(): Future[SummaryResponse] = {
log.debug("hitting db results")
val mongoResult = Future{ Thread.sleep(500); "Mongo"}
val redisResult = Future{ Thread.sleep(800); "redis"}
for{
mResult <- mongoResult
rResult <- redisResult
} yield SummaryResponse(mResult, rResult)
}
}
Following the reading of Effective Akka by Jamie Allen, I am going to attempt to apply his "Cameo" pattern suggestion.
Slideshare:
http://www.slideshare.net/shinolajla/effective-akka-scalaio
Github:
https://github.com/jamie-allen/effective_akka
I think what I created will work, but doesn't sound like the best approach based on Jamie's comments in his talks. I will update / edit back to this post what I have implemented (or try to).
Summary Actor (Cameo Actor):
object SummaryResponseHandler {
case object DbRetrievalTimeout
def props(mongoDb: ActorRef, redisDb: ActorRef, originalSender: ActorRef): Props = {
Props(new SummaryResponseHandler(mongoDb, redisDb, originalSender))
}
}
class SummaryResponseHandler(mongoDb: ActorRef, redisDb: ActorRef,
originalSender: ActorRef) extends Actor with ActorLogging {
import SummaryResponseHandler._
var mongoSummary, redisSummary: Option[String] = None
def receive = LoggingReceive {
case MongoSummary(summary) =>
log.debug(s"Received mongo summary: $summary")
mongoSummary = summary
collectSummaries
case RedisSummary(summary) =>
log.debug(s"Received redis summary: $summary")
redisSummary = summary
collectSummaries
case DbRetrievalTimeout =>
log.debug("Timeout occurred")
sendResponseAndShutdown(DbRetrievalTimeout)
}
def collectSummaries = (mongoSummary, redisSummary) match {
case (Some(m), Some(r)) =>
log.debug(s"Values received for both databases")
timeoutMessager.cancel
sendResponseAndShutdown(DataSetSummary(mongoSummary, redisSummary))
case _ =>
}
def sendResponseAndShutdown(response: Any) = {
originalSender ! response
log.debug("Stopping context capturing actor")
context.stop(self)
}
import context.dispatcher
val timeoutMessager = context.system.scheduler.scheduleOnce(
250 milliseconds, self, DbRetrievalTimeout)
}
class SummaryRetriever(mongoDb: ActorRef, redisDb: ActorRef) extends Actor with ActorLogging {
def receive = {
case GetSummary(dataSet) =>
log.debug("received dataSet")
val originalSender = sender
val handler = context.actorOf(SummaryResponseHandler.props(mongoDb,redisDb, originalSender), "cameo-message-handler")
mongoDb.tell(GetSummary(dataSet), handler)
redisDb.tell(GetSummary(dataSet), handler)
case _ => log.debug(s"Unknown result $GetSummary(datset)")
}
}
Common:
case class GetSummary(dataSet: String)
case class DataSetSummary(
mongo: Option[String],
redis: Option[String]
)
case class MongoSummary(
summary: Option[String]
)
case class RedisSummary(
summary: Option[String]
)
trait MongoProxy extends Actor with ActorLogging
trait RedisProxy extends Actor with ActorLogging
Mock Stubs:
class MongoProxyStub extends RedisProxy {
val summaryData = Map[String, String](
"dataset1" -> "MongoData1",
"dataset2" -> "MongoData2")
def receive = LoggingReceive {
case GetSummary(dataSet: String) =>
log.debug(s"Received GetSummary for ID: $dataSet")
summaryData.get(dataSet) match {
case Some(data) => sender ! MongoSummary(Some(data))
case None => sender ! MongoSummary(Some(""))
}
}
}
class RedisProxyStub extends MongoProxy{
val summaryData = Map[String, String](
"dataset1" -> "RedisData1",
"dataset2" -> "RedisData2")
def receive = LoggingReceive {
case GetSummary(dataSet: String) =>
log.debug(s"Received GetSummary for ID: $dataSet")
summaryData.get(dataSet) match {
case Some(data) => sender ! RedisSummary(Some(data))
case None => sender ! RedisSummary(Some(""))
}
}
}
Boot (You should use test, but was just wanting to run from boot):
object Boot extends App{
val system = ActorSystem("DbSystem")
val redisProxy = system.actorOf(Props[RedisProxyStub], "cameo-success-mongo")
val mongoProxy = system.actorOf(Props[MongoProxyStub], "cameo-success-redis")
val summaryRetrieverActor = system.actorOf(Props(new SummaryRetriever(redisProxy, mongoProxy)), "cameo-retriever1")
implicit val timeout = Timeout(5 seconds)
val future = summaryRetrieverActor ? GetSummary("dataset1")
val result = Await.result(future, timeout.duration).asInstanceOf[DataSetSummary]
println(Some(result.mongo).x)
println(result.redis)
system.shutdown()
}
Application Config:
akka.loglevel = "DEBUG"
akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
akka.actor.debug.autoreceive = on
akka.actor.debug.lifecycle = on
akka.actor.debug.receive = on
akka.actor.debug.event-stream = on