Misleading concurrent keyword

Versions: Java 8

Often we think that if something is from "concurrent" package, it's automatically thread-safe. By definition is true, but the definitions rarely make the applications and through this post we'll see why.

This post shows a trap that we can meet without keeping in mind that thread-safety is usually limited to single method scope. The first part, through a test case, shows the problem. The second part explains its reason while the last one gives one potential solution.

Limited thread safety trap

A good use case illustrating the introduced problem is the situation when we have to accumulate some data and flush it later in several batches. Logically, the first reaction suggests the use of concurrent collections. One of possible solutions is java.util.concurrent.ConcurrentLinkedQueue. The accumulation code using it is presented below:

object Accumulator {

  val messages: util.Queue[String] = new ConcurrentLinkedQueue[String]()

  def getBatch(batchSize: Int): mutable.ListBuffer[String] = {
    val messagesToFlush:mutable.ListBuffer[String] = new mutable.ListBuffer()
    while (!messages.isEmpty && messagesToFlush.size < batchSize) {
      val message = messages.poll()
      messagesToFlush.append(s"length=${message.length}")
    }
    messagesToFlush
  }

}

Maybe you already see the concurrency problem ? If not, you can execute the following test to see that the problem really exists:

private val BatchSize = 7

"one or more NullPointerExceptions" should "be thrown when the messages are read concurrently in isEmpty-poll paradigm" in {
  val nullPointerExceptions = mutable.ListBuffer[NullPointerException]()
  val latch = new CountDownLatch(1)
  for (thread <- 1 to 2) {
    new Thread(() => {
      while (true) {
        populateAccumulator()

        var batch = Accumulator.getBatch(BatchSize)
        while (batch.nonEmpty) {
          try {
            batch = Accumulator.getBatch(BatchSize)
          } catch {
            case npe: NullPointerException => {
              nullPointerExceptions.append(npe) 
              latch.countDown()
            }
          }
        }
      }
    }, s"Thread#${thread}").start()
  }
  latch.await(45, TimeUnit.SECONDS)

  assert(nullPointerExceptions.size > 0)
}

private def populateAccumulator(): Unit = {
  for (index <- 1 to 200) {
    Accumulator.messages.add(s"DataEntry#${index}")
  }
}

Atomic thread safety

If the problem is still not obvious, the following schema shows it pretty clearly:

As you can deduce, the thread-safety concerns methods atomically. It means that it's guaranteed when two threads calls the same method but it's not guaranteed when they call two different methods within the same scope. It's pretty logic after all but sometimes the "concurrent" magic word in the class name can lower our guards.

Solution for the trap

If you've already analyzed Java source code, you had to see the constructions like this:

// fragment of ConcurrentHashMap
for (Node<K,V>[] tab = table;;) {
  Node<K,V> f; int n, i, fh;
  if (tab == null || (n = tab.length) == 0)
    tab = initTable();
  else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null,
          new Node<K,V>(hash, key, value, null)))
        break;                   // no lock when adding to empty bin
  }
  else if ((fh = f.hash) == MOVED)
    tab = helpTransfer(tab, f);
  else {
    V oldVal = null;
    synchronized (f) {
        if (tabAt(tab, i) == f) {
// ...

Something strange except meaningless variable names ? Yes, the values are assigned directly in the statements. Without it we could encounter similar problem to our. Thus, this solution was adopted in the code and it created new polling method:

def getBatchThreadSafely(batchSize: Int): mutable.ListBuffer[String] = {
  val messagesToFlush:mutable.ListBuffer[String] = new mutable.ListBuffer()
  var message: String = null
  while ((message = messages.poll()) != null && messagesToFlush.size < batchSize) {
    messagesToFlush.append(message)
  }
  messagesToFlush
}

As you can see, the above code uses a single operation from thread-safety class and it's now thread safe itself. The second test proves it:

"no NullPointerException" should "be thrown when the messages are retrieved in while(poll) paradigm" in {
  val nullPointerExceptions = mutable.ListBuffer[NullPointerException]()
  val latch = new CountDownLatch(1)
  for (thread <- 1 to 2) {
    new Thread(() => {
      while (true) {
        populateAccumulator()
        var batch = Accumulator.getBatchThreadSafely(BatchSize)
        while (batch.nonEmpty) {
          try {
            batch = Accumulator.getBatchThreadSafely(BatchSize)
          } catch {
            case npe: NullPointerException => {
              nullPointerExceptions.append(npe)
              latch.countDown()
            }
          }
        }
      }
    }, s"Thread#${thread}").start()
  }
  latch.await(45, TimeUnit.SECONDS)

  assert(nullPointerExceptions.size == 0)
}

This post proves that the concurrency is plenty of traps and we must be careful. And so even if we work with thread-safe classses as ConcurrentLinkedQueue. A very important point to remember is that only the atomic operations are thread-safe. Using two of them in the same execution scope doesn't guarantee our program to be thread-safe. The code snippet shown in the first section proved it. Fortunately, there are always the solutions. The most brutal (it adds locks) could be the use of synchronization. However, when it's not necessary, we can simply optimize the algorithm, as it was shown in the last part of the post.

If you liked it, you should read: