Barriers in Java concurrency

A barrier in the real world helps to separate two different concepts, for example two countries. In programming, it's a technique to achieve threads synchronization. Java implements its own version of barriers in java.util.concurrent package.

At the first part of this article we'll see some theoretical concepts of barriers. We'll start by defining them and will terminate by describing the Java's implementation. The second part will present a sample code using barriers.

Barriers in Java

Barrier is considered as a kind of meeting point for several threads. While this point isn't reached by all concerned threads, program execution won't continue. So, we can deduce that the barrier is assigned to a group of threads and is treated as reached when all threads from the group arrive to it. When even one thread has the problems (for example: it's interrupted), the barrier is considered as broken.

In Java, the barriers are represented by java.util.concurrent.CyclicBarrier class. This barrier is cyclic because it can be used for several thread groups. When constructing CyclicBarrier object, we must pass to it the number of threads composing the group. We can also pass an instance of Runnable. It'll represent the action invoked when the barrier is reached. As we mentioned earlier, this barrier is considered as broken when one of threads fails (timeout, code failure, interruption). BrokenBarrierException or InterruptedException are thrown in this case, depending of the case of failure. InterruptedException will be thrown when the thread is interrupted. BrokenBarrierException'll be thrown for all other reasons.

6 methods are implemented in CyclicBarrier:
- await with or without timeout: it's used to signal that the barrier was reached by one of threads from thread group. If the thread reaching the barrier isn't the last thread from the group, it remains in waiting state until the last one calls await().

All waiting threads awake when the barrier is reached by the last thread or one of threads (also called parts) from the group fails. Another special case of awake is the invocation of reset method by one of the threads.

This method can also take as parameter one value with time unit to mark the timeout of waiting. When specified time elapses, TimeoutException is thrown.
- getNumberWaiting: returns the number of threads actually waiting for the barrier.
- getParties: returns the number of all threads needed to reach the barrier.
- isBroken: returns true if the barrier is in broken state.
- reset: this method resets the barrier to the initial state. All waiting threads throw BrokenBarrierException exception.

As we can see until now, CyclicBarrier seems to behave as CountDownLatch, explained in the article about CountDownLatch in Java. CountDownLatch is executed only once while we can reuse barrier through reset method. Another difference is that we can invoke one Runnable object when all threads reach the barrier. We can't do it with CountDownLatch. Thirdly, CountDownLatch's counter is decremented every time when countDown() method is invoked. And it's not important if one thread calls it 5 times. With CyclicBarrier, it does matter. Only await() calls from separate threads are taken in consideration to decide if the barrier was reached or no.

Example of barriers in Java

As example, we'll use the illustration of run competition. We precise some runners, each one has its own specification: Runner is a normal runner, SprintRunner is a sprinter (must reach the barrier before 1 seconds timeout), FourLegsRunner is a runner with 4 legs and HoledShoesRunner is a runner who is running with holed shoes. All try to reach the barrier represented by meta variable.

public class BarrierTest {

  @Test
  public void testSimpleBarrier() {
    Referee referee = new Referee();
    CyclicBarrier meta = new CyclicBarrier(3, referee);
    Thread runner1 = new Thread(new Runner(meta, "Runner#1"));
    Thread runner2 = new Thread(new Runner(meta, "Runner#2"));
    Thread runner3 = new Thread(new Runner(meta, "Runner#3"));

    runner1.start();
    runner2.start();
    runner3.start();

    try {
      Thread.sleep(7000);
    } catch (Exception e) {
      e.printStackTrace();
    }
    assertTrue("Referee should be notified about the run end but he wasn't", referee.isRunEnd());
    assertTrue("3 threads should participate in the barrier reaching, but only "+ meta.getParties() + 
      " were", meta.getParties() == 3);
    assertTrue("All threads should reach the barrier but "+meta.getNumberWaiting()+" didn't",  
      meta.getNumberWaiting() == 0);
  }

  @Test
  public void testBarrierMultipleAwaits() {
    Referee referee = new Referee();
    CyclicBarrier meta = new CyclicBarrier(3, referee);
    FourLegsRunner runner1Obj = new FourLegsRunner(meta, "Runner#1");
    runner1Obj.setSleeping(2000);
    FourLegsRunner runner2Obj = new FourLegsRunner(meta, "Runner#2");
    runner2Obj.setSleeping(4000);
    FourLegsRunner runner3Obj = new FourLegsRunner(meta, "Runner#3");
    runner3Obj.setSleeping(6000);
    Thread runner1 = new Thread(runner1Obj);
    Thread runner2 = new Thread(runner2Obj);
    Thread runner3 = new Thread(runner3Obj);
    runner1.start();
    runner2.start();
    runner3.start();
    // check every 2 seconds how many runners ended the run
    /*
    * The assertTrue concerns waiting threads. A waiting 
    * thread is a thread that has already reached the barrier. 
    * As you can see, CyclicBarrier is thread-aware because even
    * calling 4 times await() at the same thread, it considers that 
    * the barrier was reached only once by concerned thread. 
    * It's different in CountDownLatch where countDown() 
    * invocations are taken in consideration, even called several 
    * times by the same thread.
    */
    for (int i = 0; i < 3; i++) {
      try {
        Thread.sleep(1500);
      } catch (Exception e) {
        e.printStackTrace();
      }
      int waitingThreads = meta.getNumberWaiting();
      assertTrue(i+" waiting threads expected, but "+waitingThreads+" were detected as 'waiting'",     
          waitingThreads == i);
    }
    try {
      // Give some time to notify Referee object before check if was notified.
      Thread.sleep(2000);
    } catch (Exception e) {
      e.printStackTrace();
    }
    assertTrue("Referee should be notified about the run end but he wasn't", referee.isRunEnd());
    assertTrue("3 threads should participate in the barrier reaching, but only "+ meta.getParties() + 
      " were", meta.getParties() == 3);
    assertTrue("All threads should reach the barrier but "+meta.getNumberWaiting()+" didn't",   
      meta.getNumberWaiting() == 0);
  }

  @Test
  public void testBarrierReuse() {
    Referee referee = new Referee();
    CyclicBarrier meta = new CyclicBarrier(3, referee);
    // Here, we try to execute the same barrier twice, thanks to CyclicBarrier's reset() method
    for (int i = 0; i < 2; i++) {
      Thread runner1 = new Thread(new Runner(meta, "Runner#1"));
      Thread runner2 = new Thread(new Runner(meta, "Runner#2"));
      Thread runner3 = new Thread(new Runner(meta, "Runner#3"));

      runner1.start();
      runner2.start();
      runner3.start();

      try {
        Thread.sleep(3000);
      } catch (Exception e) {
        e.printStackTrace();
      }
      assertTrue("Referee should be notified about the run end but he wasn't", referee.isRunEnd());
      assertTrue("3 threads should participate in the barrier reaching, but only "+ meta.getParties() + " 
        were", meta.getParties() == 3);
      assertTrue("All threads should reach the barrier but "+meta.getNumberWaiting()+" didn't", 
        meta.getNumberWaiting() == 0);
      meta.reset();
    }
  }

  @Test
  public void testBarrierTimeout() {
    Referee referee = new Referee();
    CyclicBarrier meta = new CyclicBarrier(3, referee);
    SprintRunner sprinter1 = new SprintRunner(meta, "Runner#1");
    sprinter1.setSleeping(2000);
    SprintRunner sprinter2 = new SprintRunner(meta, "Runner#2");
    sprinter2.setSleeping(2000);
    SprintRunner sprinter3 = new SprintRunner(meta, "Runner#3");
    sprinter3.setSleeping(10000);
    Thread runner1 = new Thread(sprinter1);
    Thread runner2 = new Thread(sprinter2);
    Thread runner3 = new Thread(sprinter3);

    runner1.start();
    runner2.start();
    runner3.start();

    /*
    * Proof of TimeoutException thrown on await() method. 
    * Depending on thread execution order, you should see 
    * TimeoutException and BrokenBarrierException:
    * <pre>
    * Runner#1 reached the barrier
    * Runner#2 reached the barrier
    * java.util.concurrent.TimeoutException
    * at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
    * at java.util.concurrent.CyclicBarrier.await(Unknown Source)
    * at com.waitingforcode.test.concurrency.SprintRunner.run(BarrierTest.java:180)
    * at java.lang.Thread.run(Unknown Source)
    * java.util.concurrent.BrokenBarrierException
    * at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
    * at java.util.concurrent.CyclicBarrier.await(Unknown Source)
    * at com.waitingforcode.test.concurrency.SprintRunner.run(BarrierTest.java:180)
    * at java.lang.Thread.run(Unknown Source)
    * Runner#3 reached the barrier
    * java.util.concurrent.BrokenBarrierException
    * at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
    * at java.util.concurrent.CyclicBarrier.await(Unknown Source)
    * at com.waitingforcode.test.concurrency.SprintRunner.run(BarrierTest.java:180)
    * at java.lang.Thread.run(Unknown Source)
    * </pre>
    */
    try {
      Thread.sleep(8000);
    } catch (Exception e) {
      e.printStackTrace();
    }
    assertFalse("Referee shouldn't be notified about the run end but he was (sprinter3 shouldn't terminate)", referee.isRunEnd());
    assertTrue("3 threads should participate in the barrier reaching, but only "+ meta.getParties() + " were", meta.getParties() == 3);
    assertTrue("They're no threads that should be waiting because of thrown exceptions, but "+meta.getNumberWaiting()+" are still waiting", meta.getNumberWaiting() == 0);
  }

  @Test
  public void testBarrierPrematureException() {
    Referee referee = new Referee();
    CyclicBarrier meta = new CyclicBarrier(3, referee);
    Thread runner1 = new Thread(new Runner(meta, "Runner#1"));
    Thread runner2 = new Thread(new HoledShoesRunner(meta, "Runner#2"));
    Thread runner3 = new Thread(new HoledShoesRunner(meta, "Runner#3"));

    runner1.start();
    runner2.start();
    runner3.start();

    try {
      Thread.sleep(7000);
    } catch (Exception e) {
      e.printStackTrace();
    }
    /*
     * NPE will be thrown in 2 threads. 
     * So the barrier won't be reached but one thread, 
     * called 'Runner#1', will still wait for another threads.
     * 
     * Sample output:
     * <pre>
     * Exception in thread "Thread-1" java.lang.NullPointerException
     * at com.waitingforcode.test.concurrency.HoledShoesRunner.run(BarrierTest.java:215)
     * at java.lang.Thread.run(Unknown Source)
     * Runner#1 reached the barrier
     * 
*/ assertFalse("Referee shouldn't be notified about the run end but he was", referee.isRunEnd()); assertTrue("3 threads should participate in the barrier reaching, but only "+ meta.getParties() + " were", meta.getParties() == 3); assertTrue("1 thread should be waiting, but "+meta.getNumberWaiting()+" are instead", meta.getNumberWaiting() == 1); } } class Referee implements Runnable { private boolean runEnd = false; public boolean isRunEnd() { return this.runEnd; } @Override public void run() { this.runEnd = true; } } abstract class AbstractRunner { protected CyclicBarrier barrier; protected String name; protected int sleeping = 2000; public AbstractRunner(CyclicBarrier barrier, String name) { this.barrier = barrier; this.name = name; } public void setSleeping(int sleeping) { this.sleeping = sleeping; } } class HoledShoesRunner extends AbstractRunner implements Runnable { private Integer number; public HoledShoesRunner(CyclicBarrier barrier, String name) { super(barrier, name); } @Override public void run() { String stringified = number.toString(); try { this.barrier.await(); } catch (Exception e) { System.out.println("[HoledShoesRunner] Error for "+this.name); e.printStackTrace(); } } } class SprintRunner extends AbstractRunner implements Runnable { public SprintRunner(CyclicBarrier barrier, String name) { super(barrier, name); } @Override public void run() { try { Thread.sleep(this.sleeping); System.out.println("[SprintRunner] "+this.name + " reached the barrier"); this.barrier.await(1, TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); } } } class FourLegsRunner extends AbstractRunner implements Runnable { public FourLegsRunner(CyclicBarrier barrier, String name) { super(barrier, name); } @Override public void run() { try { Thread.sleep(this.sleeping); System.out.println("[FourLegsRunner] "+this.name + " reached the barrier"); this.barrier.await(); this.barrier.await(); this.barrier.await(); this.barrier.await(); } catch (Exception e) { e.printStackTrace(); } } } class Runner extends AbstractRunner implements Runnable { public Runner(CyclicBarrier barrier, String name) { super(barrier, name); } @Override public void run() { try { Thread.sleep(this.sleeping); System.out.println("[Runner] "+this.name + " reached the barrier"); this.barrier.await(); } catch (Exception e) { e.printStackTrace(); } } }

And this is a sample output after executing test cases:

[Runner] Runner#2 reached the barrier
[Runner] Runner#3 reached the barrier
[Runner] Runner#1 reached the barrier
[FourLegsRunner] Runner#1 reached the barrier
[FourLegsRunner] Runner#2 reached the barrier
[FourLegsRunner] Runner#3 reached the barrier
[Runner] Runner#2 reached the barrier
[Runner] Runner#1 reached the barrier
[Runner] Runner#3 reached the barrier
[Runner] Runner#2 reached the barrier
[Runner] Runner#3 reached the barrier
[Runner] Runner#1 reached the barrier
[SprintRunner] Runner#2 reached the barrier
[SprintRunner] Runner#1 reached the barrier
java.util.concurrent.TimeoutException
        at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
        at java.util.concurrent.CyclicBarrier.await(Unknown Source)
        at com.waitingforcode.test.concurrency.SprintRunner.run(BarrierTest.java:247)
        at java.lang.Thread.run(Unknown Source)
java.util.concurrent.BrokenBarrierException
        at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
        at java.util.concurrent.CyclicBarrier.await(Unknown Source)
        at com.waitingforcode.test.concurrency.SprintRunner.run(BarrierTest.java:247)
        at java.lang.Thread.run(Unknown Source)
Exception in thread "Thread-16" java.lang.NullPointerException
        at com.waitingforcode.test.concurrency.HoledShoesRunner.run(BarrierTest.java:227)
        at java.lang.Thread.run(Unknown Source)
Exception in thread "Thread-17" java.lang.NullPointerException
        at com.waitingforcode.test.concurrency.HoledShoesRunner.run(BarrierTest.java:227)
        at java.lang.Thread.run(Unknown Source)
[SprintRunner] Runner#3 reached the barrier
java.util.concurrent.BrokenBarrierException
        at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
        at java.util.concurrent.CyclicBarrier.await(Unknown Source)
        at com.waitingforcode.test.concurrency.SprintRunner.run(BarrierTest.java:247)
        at java.lang.Thread.run(Unknown Source)
[Runner] Runner#1 reached the barrier

This article described the concept of barriers in concurrency programs. Barrier allows one group of threads to wait until all threads reach the barrier. It's another way, with synchronized blocks, CountDownLatch, to synchronize execution of several threads. The barrier in Java is represented by CyclicBarrier class that is purely thread-aware. await() method invoked several times in the same thread will be count at exactly one barrier reach.

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!