Java: tutorials/explanations of jsr166y Phaser

2019-07-23 11:45发布

问题:

This question was asked two years ago, but the resources it mentions are either not very helpful (IMHO) or links are no longer valid.

There must be some good tutorials out there to understand Phaser. I've read the javadoc, but my eyes glaze over, since in order to really understand the javadoc you kind of have to know how these classes are supposed to be used.

Anyone have any suggestions?

回答1:

For Phaser I have answered a few questions. Seeing them may help in understanding their applications. They are linked at the bottom. But to understand what the Phaser does and why its useful its important to know what it solves.

Here are attributes of a CountdownLatch and CyclicBarrier

Note:

  • Number of parties is another way of saying # of different threads
  • Not Reusable means you will have to create a new instance of the barrier before reusing
  • A barrier is advanceable if a thread can arrive and continue doing work without waiting for others or can wait for all threads to complete

CountdownLatch

  • Fixed number of parties
  • Not resuable
  • Advanceable (look at latch.countDown(); advanceable latch.await(); must wait )

CyclicBarrier

  • Fixed number of parties
  • Reusable
  • Not advanceable

So the CountdownLatch is not reusable, you must create a new instance each time, but is avanceable. CylicBarrier can be re used but all threads must wait for each party to arrive at the barrier.

Phaser

  • Dynamic number of parties
  • Reusable
  • Advanceable

When a thread wants to be known to the Phaser they invoke phaser.register() when the thread arrives at the barrier they invoke phaser.arrive() and here is where it is advanceable. If the thread wants to await for all registered tasks to complete phaser.arriveAndAwaitAdvance()

There is also the concept of a phase in which threads can wait on a completion of a other operations that may have not completed. Once all threads arrive at the phaser's barrier a new phase is created (an increment of one).

You can take a look at my other answers, maybe it will help:

Java ExecutorService: awaitTermination of all recursively created tasks

Flexible CountDownLatch?



回答2:

For Phaser at least, I think the JavaDoc offers a fairly clear explanation. This is a class that you would use to synchronize a batch of threads, in the sense that your can register each thread in the batch with a Phaser and then use the Phaser to have them block until every thread in the batch has notified the Phaser, at which point any blocked thread(s) will begin executing. This wait/notify cycle can repeat over and over again, as desired/required.

Their sample code gives a reasonable example (though I very much dislike their 2-character indentation style):

void runTasks(List<Runnable> tasks) {
   final Phaser phaser = new Phaser(1); // "1" to register self
   // create and start threads
   for (final Runnable task : tasks) {
     phaser.register();
     new Thread() {
       public void run() {
         phaser.arriveAndAwaitAdvance(); // await all creation
         task.run();
       }
     }.start();
   }

   // allow threads to start and deregister self
   phaser.arriveAndDeregister();
 } 

This sets up a Phaser with a registration count of tasks.size() + 1, and for each task creates a new Thread which will block until the next advance of the Phaser (i.e. the time at which tasks.size() + 1 arrivals have been recorded) and then run its associated task. Each Thread that is created is also instantly started, so the Phaser comes out of the loop with tasks.size() arrivals recorded.

The final call to phaser.arriveAndDeregister() will record the final arrival, and also decrement the registration count so that it now equals tasks.size(). This causes the Phaser to advance, which in effect allows all the tasks to start running at the same time. This could be repeated by doing something like:

void runTasks(List<Runnable> tasks) {
   final Phaser phaser = new Phaser(1); // "1" to register self
   // create and start threads
   for (final Runnable task : tasks) {
     phaser.register();
     new Thread() {
       public void run() {
         while (true) {
           phaser.arriveAndAwaitAdvance(); // await all creation
           task.run();
         }
       }
     }.start();
   }

   // allow threads to start and deregister self
   phaser.arriveAndDeregister();
 }

...this is the same as before, except with the addition of a loop that causes the task to be run repeatedly. Because each iteration calls phaser.arriveAndAwaitAdvance() the execution of the task threads will be synchronized such that task-0 does not begin its second iteration until every other task has completed its first iteration and notified the Phaser that is is ready to begin its second iteration.

This may be useful if the tasks you are running vary greatly in the amount of time they take to execute and if you want to ensure that faster threads do not get out of sync with slower ones.

For a possible real-world application, consider a game that runs separate graphics and physics threads. You don't want to have the physics thread computing data for frame 100 if the graphics thread is stuck on frame 6, and using a Phaser is one possible approach to ensuring that the graphics and physics threads are always working on the same frame at the same time (and also that if one thread is significantly slower than the other the faster thread gracefully yields CPU resources so that hopefully the slower thread can catch up quicker).



回答3:

Phaser is somewhat similar in functionality of CyclicBarrier and CountDownLatch but it provides more flexibility than both of them.

In CyclicBarrier we used to registering parties in constructor but Phaser provides us flexibility of registering and deRegistering parties at any time. For registering parties, we may use any of the following -

  • constructors
  • register, or
  • bulkRegister

For deRegistering parties, we may use -

  • arriveAndDeregister, or


register- Adds/Registers a new unarrived party to this phaser. It returns

  • the arrival phase number to which this registration applied.
  • If phaser has terminated then value is negative and registration has no effect.

If invocation of onAdvance() method is in progress than before returning this method may await its completion. If this phaser has a parent, and there were no registered parties with this phaser, this child phaser is also registered with its parent.
Program to demonstrate usage of Phaser>

import java.util.concurrent.Phaser;

public class PhaserTest {
public static void main(String[] args) {

       /*Creates a new phaser with 1 registered unArrived parties 
        * and initial phase number is 0
        */
       Phaser phaser=new Phaser(1);
       System.out.println("new phaser with 1 registered unArrived parties"
                    + " created and initial phase  number is 0 ");

       //Create 3 threads
       Thread thread1=new Thread(new MyRunnable(phaser,"first"),"Thread-1");
       Thread thread2=new Thread(new MyRunnable(phaser,"second"),"Thread-2");
       Thread thread3=new Thread(new MyRunnable(phaser,"third"),"Thread-3");

       System.out.println("\n--------Phaser has started---------------");
       //Start 3 threads
       thread1.start();
       thread2.start();
       thread3.start();

       //get current phase
       int currentPhase=phaser.getPhase();
       /*arriveAndAwaitAdvance() will cause thread to wait until current phase
        * has been completed i.e. until all registered threads
        * call arriveAndAwaitAdvance()
        */
       phaser.arriveAndAwaitAdvance();
       System.out.println("------Phase-"+currentPhase+" has been COMPLETED----------");

       //------NEXT PHASE BEGINS------

       currentPhase=phaser.getPhase();
       phaser.arriveAndAwaitAdvance();
       System.out.println("------Phase-"+currentPhase+" has been COMPLETED----------");

       /* current thread Arrives and deRegisters from phaser.
        * DeRegistration reduces the number of parties that may
        * be required to advance in future phases.
        */
       phaser.arriveAndDeregister();

       //check whether phaser has been terminated or not.
       if(phaser.isTerminated())
              System.out.println("\nPhaser has been terminated");

} 
}





class MyRunnable implements Runnable{

Phaser phaser;

MyRunnable(Phaser phaser,String name){
       this.phaser=phaser;
       this.phaser.register(); //Registers/Add a new unArrived party to this phaser.
       System.out.println(name +" - New unarrived party has "
                    + "been registered with phaser");
}

@Override
public void run() {
       System.out.println(Thread.currentThread().getName() +
                    " - party has arrived and is working in "
                    + "Phase-"+phaser.getPhase());
       phaser.arriveAndAwaitAdvance();

       //Sleep has been used for formatting output
       try {
              Thread.sleep(1000);
       } catch (InterruptedException e) {
              e.printStackTrace();
       }

       //------NEXT PHASE BEGINS------

       System.out.println(Thread.currentThread().getName() +
                    " - party has arrived and is working in "
                    + "Phase-"+phaser.getPhase());
       phaser.arriveAndAwaitAdvance();  

       phaser.arriveAndDeregister();
}

}


bulkRegister - Adds parties number of new unarrived parties to this phaser. It returns

  • the arrival phase number to which this registration applied.
  • If phaser has terminated then value is negative and registration has no effect.

If invocation of onAdvance() method is in progress than before returning this method may await its completion.

arriveAndDeregister- Current thread (Party) Arrives and deRegisters from phaser. DeRegistration reduces the number of parties that may be required in future to move to next phase.

If this phaser has a parent, and there were no registered parties with this phaser, this child phaser is also registered with its parent. Program to demonstrate Parent and child Phaser

import java.util.concurrent.Phaser;

public class PhaserParentChildTest {
public static void main(String[] args) {


    /*
  * Creates a new phaser with no registered unArrived parties.
  */
    Phaser parentPhaser = new Phaser();

    /*
  * Creates a new phaser with the given parent &
  * no registered unArrived parties.
  */
    Phaser childPhaser = new Phaser(parentPhaser,0);

    childPhaser.register();

    System.out.println("parentPhaser isTerminated : "+parentPhaser.isTerminated());
    System.out.println("childPhaser isTerminated : "+childPhaser.isTerminated());

    childPhaser.arriveAndDeregister();
    System.out.println("\n--childPhaser has called arriveAndDeregister()-- \n");

    System.out.println("parentPhaser isTerminated : "+parentPhaser.isTerminated());
    System.out.println("childPhaser isTerminated : "+childPhaser.isTerminated());

}
}

Read more about Phaser on http://www.javamadesoeasy.com/2015/03/phaser-in-java_21.html