We observed a strange behavior when we tried to start a number of futures from within an actor's receive method. If we use our configured dispatchers as ExecutionContext, the futures run on the same thread and sequentially. If we use ExecutionContext.Implicits.global, the futures run in parallel as expected.
We boiled down the code to the following example (a more complete example is below):
implicit val ec = context.getDispatcher
Future{ doWork() } // <-- all running parallel
Future{ doWork() }
Future{ doWork() }
Future{ doWork() }
Future {
Future{ doWork() }
Future{ doWork() } // <-- NOT RUNNING PARALLEL!!! WHY!!!
Future{ doWork() }
Future{ doWork() }
}
A compilable example would be like this:
import akka.actor.ActorSystem
import scala.concurrent.{ExecutionContext, Future}
object WhyNotParallelExperiment extends App {
val actorSystem = ActorSystem(s"Experimental")
// Futures not started in future: running in parallel
startFutures(runInFuture = false)(actorSystem.dispatcher)
Thread.sleep(5000)
// Futures started in future: running in sequentially. Why????
startFutures(runInFuture = true)(actorSystem.dispatcher)
Thread.sleep(5000)
actorSystem.terminate()
private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
if (runInFuture) {
Future{
println(s"Start Futures on thread ${Thread.currentThread().getName()}")
(1 to 9).foreach(startFuture)
println(s"Started Futures on thread ${Thread.currentThread().getName()}")
}
} else {
(11 to 19).foreach(startFuture)
}
}
private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
Thread.sleep(500)
println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
}
}
We tried with both, thread-pool-executor and fork-join-executor, with the same result.
Are we using futures in the wrong way? How should you then spawn parallel tasks?
From the description of Akka's internal
BatchingExecutor
(emphasis mine):If you're using a dispatcher that mixes in
BatchingExecutor
--namely, a subclass ofMessageDispatcher
--you could use thescala.concurrent.blocking
construct to enable parallelism with nested Futures:In your example, you would add
blocking
in thestartFuture
method:Sample output from running
startFutures(true)(actorSystem.dispatcher)
with the above change:After some research I found out that the
Dispatcher
class implememtsakka.dispatch.BatchingExecutor
. For performance reasons, this class checks which tasks should be batched on the same thread.Future.map
internally creates ascala.concurrent.OnCompleteRunnable
which is batched in theBatchingExecutor
.This seems to be reasonable for
map()
/flatMap()
where one task generates one subsequent task, but not for explicit new Futures which are used to fork work. Internally,Future.apply
is implemented byFuture.successful().map
and is thus batched. My workaround is now to create futures in a different way:The
FutureStarter
-Runnables are not batched and thus run in parallel.Can anybody confirm that this solution is okay? Are there better ways to resolve this issue? Is the current implementation of
Future
/BatchingExecutor
wanted, or is it a bug?It has to do with "throughput" setting for dispatcher. I added a "fair-dispatcher" to application.conf to demonstrate this:
Here is your example with a few modifications to use fair dispatcher for Futures and print the current value of throughput setting:
Output:
As you can see, fair-dispatcher uses different threads for most of the futures.
Default dispatcher is optimized for actors so the throughput is set to 5 to minimize context switches to improve message processing throughput while maintaining some degree of fairness.
The only change in my fair-dispatcher is throughput: 1, i.e. each async execution request is given its own thread if possible (up to parallelism-max).
I'd recommend to create separate dispatchers for futures used for different purposes. E.g. one dispatcher (i.e. thread pool) for calling some web services, another one for blocking DB access etc. This would give you more precise control over it by tweaking custom dispatcher settings.
Take a look at https://doc.akka.io/docs/akka/current/dispatchers.html, it is really useful for understanding the details.
Also check out the Akka reference settings (default-dispatcher in particular), there are a bunch of useful comments over there: https://github.com/akka/akka/blob/master/akka-actor/src/main/resources/reference.conf