I am trying to understand Phaser in java. I wrote an example which is stuck at advance waiting for other parties to arrive.
As far as I understand, phaser is used as a reusable thread synchronization (unlike CountdownLatch which is not reusable) barrier with a barrier action (unlike Cyclicbarrier which is used to share state, Phaser doesn't have to share state in barrier action). Correct me if I am wrong.
So, in my example, I am trying to execute some random addition and subtraction code in each thread after certain number of parties/threads reach the barrier. What am I doing wrong?
import static java.lang.String.*;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.stream.IntStream;
public class PhaserUsage implements Callable<String> {
private static final int THREAD_POOL_SIZE = 10;
private final Phaser phaser;
private PhaserUsage(Phaser phaser) {
this.phaser = phaser;
}
public static void main(String a[]) {
ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
CompletionService<String> completionService = new ExecutorCompletionService<>(execService);
Phaser phaser = new Phaser(1);
IntStream.range(0, THREAD_POOL_SIZE)
.forEach(nbr -> completionService.submit(new PhaserUsage(phaser)));
execService.shutdown();
try {
while (!execService.isTerminated()) {
String result = completionService.take().get();
System.out.println(format("Result is: %s", result));
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String call() {
String threadName = Thread.currentThread().getName();
System.out.println(format("Registering...%s",threadName));
phaser.register();
System.out.println(format("Arrive and await advance...%s",threadName));
phaser.arriveAndAwaitAdvance(); // await all creation
int a = 0, b = 1;
Random random = new Random();
for (int i = 0; i < random.nextInt(10000000); i++) {
a = a + b;
b = a - b;
}
System.out.println(format("De-registering...%s",threadName));
phaser.arriveAndDeregister();
return format("Thread %s results: a = %s, b = %s", threadName, a, b);
}
}
You initialize your Phaser with a value of 1:
This means that your main thread is one of the threads you are waiting for, but it never calls arrive().
As the number of your threads is fixed, you should initialize the Phaser with the thread number, and remove the register() calls.
Here is working code with no phaser.register():
The problem is that you are cannot call
phaser.register()
from within the task being registered. When using phasers always follow these two rules:finally
block that deregisters at the end (see example).Here is your fixed program (notice the line that creates the phaser):