Which Java synchronization construct is likely to provide the best performance for a concurrent, iterative processing scenario with a fixed number of threads like the one outlined below? After experimenting on my own for a while (using ExecutorService and CyclicBarrier) and being somewhat surprised by the results, I would be grateful for some expert advice and maybe some new ideas. Existing questions here do not seem to focus primarily on performance, hence this new one. Thanks in advance!
The core of the app is a simple iterative data processing algorithm, parallelized to the spread the computational load across 8 cores on a Mac Pro, running OS X 10.6 and Java 1.6.0_07. The data to be processed is split into 8 blocks and each block is fed to a Runnable to be executed by one of a fixed number of threads. Parallelizing the algorithm was fairly straightforward, and it functionally works as desired, but its performance is not yet what I think it could be. The app seems to spend a lot of time in system calls synchronizing, so after some profiling I wonder whether I selected the most appropriate synchronization mechanism(s).
A key requirement of the algorithm is that it needs to proceed in stages, so the threads need to sync up at the end of each stage. The main thread prepares the work (very low overhead), passes it to the threads, lets them work on it, then proceeds when all threads are done, rearranges the work (again very low overhead) and repeats the cycle. The machine is dedicated to this task, Garbage Collection is minimized by using per-thread pools of pre-allocated items, and the number of threads can be fixed (no incoming requests or the like, just one thread per CPU core).
V1 - ExecutorService
My first implementation used an ExecutorService with 8 worker threads. The program creates 8 tasks holding the work and then lets them work on it, roughly like this:
// create one thread per CPU
executorService = Executors.newFixedThreadPool( 8 );
...
// now process data in cycles
while( ...) {
// package data into 8 work items
...
// create one Callable task per work item
...
// submit the Callables to the worker threads
executorService.invokeAll( taskList );
}
This works well functionally (it does what it should), and for very large work items indeed all 8 CPUs become highly loaded, as much as the processing algorithm would be expected to allow (some work items will finish faster than others, then idle). However, as the work items become smaller (and this is not really under the program's control), the user CPU load shrinks dramatically:
blocksize | system | user | cycles/sec
256k 1.8% 85% 1.30
64k 2.5% 77% 5.6
16k 4% 64% 22.5
4096 8% 56% 86
1024 13% 38% 227
256 17% 19% 420
64 19% 17% 948
16 19% 13% 1626
Legend: - block size = size of the work item (= computational steps) - system = system load, as shown in OS X Activity Monitor (red bar) - user = user load, as shown in OS X Activity Monitor (green bar) - cycles/sec = iterations through the main while loop, more is better
The primary area of concern here is the high percentage of time spent in the system, which appears to be driven by thread synchronization calls. As expected, for smaller work items, ExecutorService.invokeAll() will require relatively more effort to sync up the threads versus the amount of work being performed in each thread. But since ExecutorService is more generic than it would need to be for this use case (it can queue tasks for threads if there are more tasks than cores), I though maybe there would be a leaner synchronization construct.
V2 - CyclicBarrier
The next implementation used a CyclicBarrier to sync up the threads before receiving work and after completing it, roughly as follows:
main() {
// create the barrier
barrier = new CyclicBarrier( 8 + 1 );
// create Runable for thread, tell it about the barrier
Runnable task = new WorkerThreadRunnable( barrier );
// start the threads
for( int i = 0; i < 8; i++ )
{
// create one thread per core
new Thread( task ).start();
}
while( ... ) {
// tell threads about the work
...
// N threads + this will call await(), then system proceeds
barrier.await();
// ... now worker threads work on the work...
// wait for worker threads to finish
barrier.await();
}
}
class WorkerThreadRunnable implements Runnable {
CyclicBarrier barrier;
WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; }
public void run()
{
while( true )
{
// wait for work
barrier.await();
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
Again, this works well functionally (it does what it should), and for very large work items indeed all 8 CPUs become highly loaded, as before. However, as the work items become smaller, the load still shrinks dramatically:
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.7% 78% 6.1
16k 5.5% 52% 25
4096 9% 29% 64
1024 11% 15% 117
256 12% 8% 169
64 12% 6.5% 285
16 12% 6% 377
For large work items, synchronization is negligible and the performance is identical to V1. But unexpectedly, the results of the (highly specialized) CyclicBarrier seem MUCH WORSE than those for the (generic) ExecutorService: throughput (cycles/sec) is only about 1/4th of V1. A preliminary conclusion would be that even though this seems to be the advertised ideal use case for CyclicBarrier, it performs much worse than the generic ExecutorService.
V3 - Wait/Notify + CyclicBarrier
It seemed worth a try to replace the first cyclic barrier await() with a simple wait/notify mechanism:
main() {
// create the barrier
// create Runable for thread, tell it about the barrier
// start the threads
while( ... ) {
// tell threads about the work
// for each: workerThreadRunnable.setWorkItem( ... );
// ... now worker threads work on the work...
// wait for worker threads to finish
barrier.await();
}
}
class WorkerThreadRunnable implements Runnable {
CyclicBarrier barrier;
@NotNull volatile private Callable<Integer> workItem;
WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; this.workItem = NO_WORK; }
final protected void
setWorkItem( @NotNull final Callable<Integer> callable )
{
synchronized( this )
{
workItem = callable;
notify();
}
}
public void run()
{
while( true )
{
// wait for work
while( true )
{
synchronized( this )
{
if( workItem != NO_WORK ) break;
try
{
wait();
}
catch( InterruptedException e ) { e.printStackTrace(); }
}
}
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
Again, this works well functionally (it does what it should).
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.4% 80% 6.3
16k 4.6% 60% 30.1
4096 8.6% 41% 98.5
1024 12% 23% 202
256 14% 11.6% 299
64 14% 10.0% 518
16 14.8% 8.7% 679
The throughput for small work items is still much worse than that of the ExecutorService, but about 2x that of the CyclicBarrier. Eliminating one CyclicBarrier eliminates half of the gap.
V4 - Busy wait instead of wait/notify
Since this app is the primary one running on the system and the cores idle anyway if they're not busy with a work item, why not try a busy wait for work items in each thread, even if that spins the CPU needlessly. The worker thread code changes as follows:
class WorkerThreadRunnable implements Runnable {
// as before
final protected void
setWorkItem( @NotNull final Callable<Integer> callable )
{
workItem = callable;
}
public void run()
{
while( true )
{
// busy-wait for work
while( true )
{
if( workItem != NO_WORK ) break;
}
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
Also works well functionally (it does what it should).
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.2% 81% 6.3
16k 4.2% 62% 33
4096 7.5% 40% 107
1024 10.4% 23% 210
256 12.0% 12.0% 310
64 11.9% 10.2% 550
16 12.2% 8.6% 741
For small work items, this increases throughput by a further 10% over the CyclicBarrier + wait/notify variant, which is not insignificant. But it is still much lower-throughput than V1 with the ExecutorService.
V5 - ?
So what is the best synchronization mechanism for such a (presumably not uncommon) problem? I am weary of writing my own sync mechanism to completely replace ExecutorService (assuming that it is too generic and there has to be something that can still be taken out to make it more efficient). It is not my area of expertise and I'm concerned that I'd spend a lot of time debugging it (since I'm not even sure my wait/notify and busy wait variants are correct) for uncertain gain.
Any advice would be greatly appreciated.