Phaser in Java concurrency

JDK 7 introduced a interesting concurrency concept of mix between CountDownLatch and CyclicBarrier mechanisms. It's name - Phaser.

This article will describe this class more in details. At the begin, we'll focus on Phaser specificity, as available methods and differences between it and CountDownLatch and CyclicBarrier classes. The second part will show an example of the code using Phaser.

What is Phaser ?

According to Phaser's Javadoc, Phaser is a class which works similarly to CyclicBarrier and CountDownLatch. As these two last classes, Phaser also allows several threads to wait until the condition is accomplished before continue. A condition can be, for example, that all "phased" threads terminate theirs tasks. In exactly the same way works CountDownLatch which plays with await() and countDown() methods to achieve the same effect.

So, which are the differences between Phaser and CyclicBarrier and CountDownLatch ? Let's enumerate them:

If these concepts seem clear, we can explore available methods of Phaser:

Example of use of Phaser

In our example, we'll inspect Phaser behavior in several angles. The first one will consist to show how we can execute customized parts of code according to just reached phase. You'll note it with awaitAdvanceInterruptibly calls. The second approached aspect is the reusability. In our testBasics case we use one Phaser object to make two grouped tasks. Each task is executed by 3 workers. In additionally, we add one phaser which will block the execution of main thread thanks to arriveAndAwaitAdvance method. And finally, we'll see that the reusability is possible with arriveAndDeregister method which decrements the counter of registered tasks. Otherwise, for example with simple arrive() method, the counter won't decrement and the program risks to run infinitely.

This is the test case:

public class PhaserTest {

  @Test
  public void testBasics() {
    final Phaser phaser = new Phaser();
    /**
      * Current phase is 0 because they're no arrive invocations before.
      */
    assertTrue("Current phase should be 0 but is "+phaser.getPhase(), phaser.getPhase() == 0);

    /**
      * This block will try a TimeoutException. Why ? Because the phase 0 isn't advanced. 
      * It will be advanced when all registered tasks will invoke arrive* method.
      */
    boolean wasTe = false;
    try {
      phaser.awaitAdvanceInterruptibly(0, 2, TimeUnit.SECONDS);
    } catch (TimeoutException te) {
      wasTe = true;
    }
    assertTrue("TimeoutException should be thrown but it wasn't", wasTe);

    Thread worker1 = new Thread(new Worker(phaser, "#1"));
    Thread worker2 = new Thread(new Worker(phaser, "#2"));
    Thread worker3 = new Thread(new Worker(phaser, "#3"));
    worker1.start();

    final Map<String, String> crashContainer = new HashMap<String, String>();
    /**
      * Note that all phaser tasks call arrive* after 2 seconds. Here, we're 
      * waiting 3 seconds before to check if 0 phase was reached. This code
      * shouldn't fail because phase 0 should be done before this timeout.
      */
    new Thread(new Runnable() {
      @Override
      public void run() {
        try {
          phaser.awaitAdvanceInterruptibly(0, 3, TimeUnit.SECONDS);
          crashContainer.put("testOk3Sec", "OK");
        } catch (Exception e) {
          crashContainer.put("testOk3Sec", "NOK");
        }
      }
    }).start();
    worker2.start();
    worker3.start();
    phaser.arriveAndAwaitAdvance();

     /**
      * Because phaser can be reused, we can run once again our workers 
      * with the same Phaser instance. However, be careful about the number
      * of registered tasks. If you register new tasks without unregistering 
      * the old ones (already terminated), your code will never stop. It's
      * the reason why in this sample we invoke awaitAndDeregister in 
      * tasks run() method.
      */
    Thread worker4 = new Thread(new Worker(phaser, "#4"));
    Thread worker5 = new Thread(new Worker(phaser, "#5"));
    Thread worker6 = new Thread(new Worker(phaser, "#6"));
    worker4.start();
    worker5.start();
    worker6.start();

     /**
      * We test again the execution of specific code after the terminate of the 1st phase.
      */
    new Thread(new Runnable() {
      @Override
      public void run() {
        try {
          phaser.awaitAdvanceInterruptibly(1, 3, TimeUnit.SECONDS);
          crashContainer.put("testOk3SecP1", "OK");
        } catch (Exception e) {
          crashContainer.put("testOk3SecP1", "NOK");
        }
      }
    }).start();
    phaser.arriveAndAwaitAdvance();

    // Sleep by security
    TimeUnit.SECONDS.sleep(2);

    assertTrue("Phase 0: 3 seconds timeout method shouldn't fail but it was", 
      crashContainer.get("testOk3Sec").equals("OK"));
    assertTrue("Phase 1: 3 seconds timeout method shouldn't fail but it was", 
      crashContainer.get("testOk3SecP1").equals("OK"));
  }
}


class Worker implements Runnable {

  private Phaser phaser;
  private String name;

  public Worker(Phaser phaser, String name) {
    this.phaser = phaser;
    this.name = name;
    phaser.register();
  }

  @Override
  public void run() {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    this.phaser.arriveAndDeregister();
  }

}

Phaser shows that is a great mix of CyclicBarrier and CountDownLatch objects. It's a reusable barrier which can change the number of participating threads dynamically. Thanks to it, we can also execute some specific tasks to realized steps thanks to awaitAdvanceInterruptibly method.

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!