Understanding phaser in java with an example

2019-02-15 18:03发布

I am trying to understand Phaser in java. I wrote an example which is stuck at advance waiting for other parties to arrive.

As far as I understand, phaser is used as a reusable thread synchronization (unlike CountdownLatch which is not reusable) barrier with a barrier action (unlike Cyclicbarrier which is used to share state, Phaser doesn't have to share state in barrier action). Correct me if I am wrong.

So, in my example, I am trying to execute some random addition and subtraction code in each thread after certain number of parties/threads reach the barrier. What am I doing wrong?

import static java.lang.String.*;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.stream.IntStream;

public class PhaserUsage implements Callable<String> {

    private static final int THREAD_POOL_SIZE = 10;
    private final Phaser phaser;

    private PhaserUsage(Phaser phaser) {
        this.phaser = phaser;
    }

    public static void main(String a[]) {
        ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
        CompletionService<String> completionService = new ExecutorCompletionService<>(execService);

        Phaser phaser = new Phaser(1);
        IntStream.range(0, THREAD_POOL_SIZE)
                .forEach(nbr -> completionService.submit(new PhaserUsage(phaser)));

        execService.shutdown();

         try {
             while (!execService.isTerminated()) {
                String result = completionService.take().get();
                System.out.println(format("Result is: %s", result));
             }
          } catch (ExecutionException | InterruptedException e) {
             e.printStackTrace();
          }
    }

    @Override
    public String call() {
        String threadName = Thread.currentThread().getName();
        System.out.println(format("Registering...%s",threadName));
        phaser.register();
        System.out.println(format("Arrive and await advance...%s",threadName));
        phaser.arriveAndAwaitAdvance(); // await all creation
        int a = 0, b = 1;
        Random random = new Random();
        for (int i = 0; i < random.nextInt(10000000); i++) {
            a = a + b;
            b = a - b;
        }
        System.out.println(format("De-registering...%s",threadName));
        phaser.arriveAndDeregister();
        return format("Thread %s results: a = %s, b = %s", threadName, a, b);
    }
}

标签: java-8 phaser
3条回答
Explosion°爆炸
2楼-- · 2019-02-15 18:26

You initialize your Phaser with a value of 1:

Phaser phaser = new Phaser(1);

This means that your main thread is one of the threads you are waiting for, but it never calls arrive().

As the number of your threads is fixed, you should initialize the Phaser with the thread number, and remove the register() calls.

查看更多
\"骚年 ilove
3楼-- · 2019-02-15 18:27

Here is working code with no phaser.register():

import static java.lang.String.*;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.stream.IntStream;

public class PhaserUsage implements Callable<String> {

    private static final int THREAD_POOL_SIZE = 10;
    private Phaser phaser;

    private PhaserUsage(Phaser phaser) {
        this.phaser = phaser;
    }

    public static void main(String a[]) {
        ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
        CompletionService<String> completionService = new ExecutorCompletionService<>(execService);

        Phaser phaser = new Phaser(1);
        IntStream.range(0, THREAD_POOL_SIZE)
                .forEach(nbr -> completionService.submit(new PhaserUsage(phaser)));

        execService.shutdown();

         try {
             while (!execService.isTerminated()) {
                String result = completionService.take().get();
                System.out.println(format("Result is: %s", result));
             }
          } catch (ExecutionException | InterruptedException e) {
             e.printStackTrace();
          }
    }

    @Override
    public String call() {
        String threadName = Thread.currentThread().getName();
        System.out.println(format("Registering...%s",threadName));
        //phaser.register();
        System.out.println(format("Arrive and await advance...%s",threadName));
        phaser.arriveAndAwaitAdvance(); // await all creation
        int a = 0, b = 1;
        Random random = new Random();
        for (int i = 0; i < random.nextInt(10000000); i++) {
            a = a + b;
            b = a - b;
        }
        System.out.println(format("De-registering...%s",threadName));
        phaser.arriveAndDeregister();
        return format("Thread %s results: a = %s, b = %s", threadName, a, b);
    }
}
查看更多
4楼-- · 2019-02-15 18:35

The problem is that you are cannot call phaser.register() from within the task being registered. When using phasers always follow these two rules:

  1. Only registered tasks can register other tasks. This means that a task cannot register itself.
  2. All registered tasks must deregister before ending. A good practice is to wrap the code using a phaser around a finally block that deregisters at the end (see example).

Here is your fixed program (notice the line that creates the phaser):

import static java.lang.String.*;

import java.util.Random;
import java.util.concurrent.*;
import java.util.stream.IntStream;

public class PhaserUsage implements Callable<String> {

    private static final int THREAD_POOL_SIZE = 10;
    private final Phaser phaser;

    private PhaserUsage(Phaser phaser) {
        this.phaser = phaser;
    }

    public static void main(String a[]) {
        ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
        CompletionService<String> completionService = new ExecutorCompletionService<>(execService);

        // since we know beforehand how many tasks we have, initialize the
        // number of participants in the constructor; other wise register
        // *before* launching the task
        Phaser phaser = new Phaser(THREAD_POOL_SIZE);

        IntStream.range(0, THREAD_POOL_SIZE)
                .forEach(nbr -> completionService.submit(new PhaserUsage(phaser)));

        execService.shutdown();

         try {
             while (!execService.isTerminated()) {
                String result = completionService.take().get();
                System.out.println(format("Result is: %s", result));
             }
          } catch (ExecutionException | InterruptedException e) {
             e.printStackTrace();
          }
    }

    @Override
    public String call() {
        String threadName = Thread.currentThread().getName();
        System.out.println(format("Arrive and await advance...%s",threadName));
        phaser.arriveAndAwaitAdvance(); // await all creation
        int a = 0, b = 1;
        Random random = new Random();
        for (int i = 0; i < random.nextInt(10000000); i++) {
            a = a + b;
            b = a - b;
        }
        System.out.println(format("De-registering...%s",threadName));
        phaser.arriveAndDeregister();
        return format("Thread %s results: a = %s, b = %s", threadName, a, b);
    }
}
查看更多
登录 后发表回答