scala.concurrent.blocking - what does it actually

2019-01-10 07:13发布

问题:

I have spent a while learning the topic of Scala execution contexts, underlying threading models and concurrency. Can you explain in what ways does scala.concurrent.blocking "adjust the runtime behavior" and "may improve performance or avoid deadlocks" as described in the scaladoc?

In the documentation, it is presented as a means to await api that doesn't implement Awaitable. (Perhaps also just long running computation should be wrapped?).

What is it that it actually does?

Following through the source doesn't easily betray its secrets.

回答1:

blocking is meant to act as a hint to the ExecutionContext that the contained code is blocking and could lead to thread starvation. This will give the thread pool a chance to spawn new threads in order to prevent starvation. This is what is meant by "adjust the runtime behavior". It's not magic though, and won't work with every ExecutionContext.

Consider this example:

import scala.concurrent._
val ec = scala.concurrent.ExecutionContext.Implicits.global

(0 to 100) foreach { n =>
    Future {
        println("starting Future: " + n)
        blocking { Thread.sleep(3000) }
        println("ending Future: " + n)
    }(ec)
}

This is using the default global ExecutionContext. Running the code as-is, you will notice that the 100 Futures are all executed immediately, but if you remove blocking, they only execute a few at a time. The default ExecutionContext will react to blocking calls (marked as such) by spawning new threads, and thus doesn't get overloaded with running Futures.

Now look at this example with a fixed pool of 4 threads:

import java.util.concurrent.Executors
val executorService = Executors.newFixedThreadPool(4)
val ec = ExecutionContext.fromExecutorService(executorService)

(0 to 100) foreach { n =>
    Future {
        println("starting Future: " + n)
        blocking { Thread.sleep(3000) }
        println("ending Future: " + n)
    }(ec)
}

This ExecutionContext isn't built to handle spawning new threads, and so even with my blocking code surrounded with blocking, you can see that it will still only execute at most 4 Futures at a time. And so that's why we say it "may improve performance or avoid deadlocks"--it's not guaranteed. As we see in the latter ExecutionContext, it's not guaranteed at all.

How does it work? As linked, blocking executes this code:

BlockContext.current.blockOn(body)(scala.concurrent.AwaitPermission)

BlockContext.current retrieves the BlockContext from the current thread, seen here. A BlockContext is usually just a Thread with the BlockContext trait mixed in. As seen in the source, it is either stored in a ThreadLocal, or if it's not found there, it is pattern matched out of the current thread. If the current thread is not a BlockContext, then the DefaultBlockContext is used instead.

Next, blockOn is called on the current BlockContext. blockOn is an abstract method in BlockContext, so it's implementation is dependent on how the ExecutionContext handles it. If we look at the implementation for DefaultBlockContext (when the current thread is not a BlockContext), we see that blockOn actually does nothing there. So using blocking in a non-BlockContext means that nothing special is done at all, and the code is run as-is, with no side-effects.

What about threads that are BlockContexts? For instance, in the global context, seen here, blockOn does quite a bit more. Digging deeper, you can see that it's using a ForkJoinPool under the hood, with the DefaultThreadFactory defined in the same snippet being used for spawning new threads in the ForkJoinPool. Without the implementation of blockOn from the BlockContext (thread), the ForkJoinPool doesn't know you're blocking, and won't try to spawn more threads in response.

Scala's Await too, uses blocking for its implementation.