didn't know how to name this thread but will try to explain the problem in few lines.
I have a command which need to calculate price for desired date range. To calculate it system needs to fetch the price for every day individually (DB, config, cache, it doesn't matter from where).
My suggestion was to have one PriceRangeActor which will have a pool of DailyPriceActors and will send them commands like CalculateDailyPrice.
But how to assemble all that data in PriceRanceActor?
1.
Having some big map with complex keys just smells a lot. How to determine then if the range is completely calculated? Is there any easier way of doing this?
2.
Create new PriceRangeActor for every command and use ask pattern to query the list of DailyPriceActors?
Because you aren't utilizing any message passing/queuing I suggest Futures rather than Actors as your concurrency abstraction mechanism. This blog entry makes a very compelling argument that Actors are for state and Futures are for computation.
With either Futures or Actor ?
(which is a Future) you can use Future.sequence
to bundle together all of the separate querying Futures into a single Future that only completes once all the sub-queries are complete.
USING FUTURES (recommended)
import scala.concurrent.Future
object Foo extends App {
type Date = Int
type Prices = Seq[Float]
type PriceMap = Map[Date, Prices]
//expensive query function
def fetchPrices(date : Date) : Prices = ???
//the Dates to query Prices for
val datesToQuery : Seq[Date] = ???
import scala.concurrent.ExecutionContext.Implicits._
def concurrentQuery(date : Date) : Future[Prices] = Future {fetchPrices(date)}
//launches a Future per date query, D Dates => D Futures
//Future.sequence converts the D Futures into 1 Future
val dates2PricesFuture : Future[PriceMap] =
Future.sequence(datesToQuery map concurrentQuery)
.map(datesToQuery zip _)
.map(_.toMap)
dates2PricesFuture onSuccess { case priceMap : PriceMap =>
//process the price data which is now completely available
}
}//end object Foo
USING ACTORS
import scala.concurrent.Future
import akka.actor.{Actor, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
object Foo extends App {
type Date = Int
type Prices = Seq[Float]
type PriceMap = Map[Date, Prices]
def fetchPrices(date : Date) : Prices = ???
val datesToQuery : Seq[Date] = ???
class QueryActor() extends Actor {
def receive = { case date : Date => sender ! fetchPrices(date) }
}
implicit val as = ActorSystem()
implicit val queryTimeout = Timeout(1000)
import as.dispatcher
def concurrentQuery(date : Date) : Future[Prices] =
ask(as actorOf Props[QueryActor],date).mapTo[Prices]
val dates2PricesFuture : Future[PriceMap] =
Future.sequence(datesToQuery map concurrentQuery)
.map(datesToQuery zip _)
.map(_.toMap)
dates2PricesFuture onSuccess ... //same as first example
}//end object Foo