Tests in multi-threading environment with JUnit

Tests driven developpement (TDD) has a lot of advantages. We can simply rewrite a class and be sure that the application will still work after thanks to appropriated test cases. It's pretty simple to write them in single threading environment. But what to do when a test must be executed in multi-threading environment ?

A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free

👉 Register here

Through this article we'll discover how to write JUnit tests and execute them in multi-threading environment. In the first part, we'll write a simple multi-threading test code and see its behaviour in normally written test case. After we'll explore two methods that we can implement in multi-threading tests.

Our test case concerns file's access synchronization. We want that only one thread could read and write a file at a time. To do that, we'll use java.nio package with FileChannel and FileLock classes. Let's take a look on this class:

show file locking source code

And there're a test case for previously defined code:

@Test
public void simpleLockTest() {
try {
  LockedFile lockedFile = new LockedFile("/home/bartosz/tmp/testLocked.txt");
  lockedFile.acquireLockAndFile();
  assertTrue("After the call of acquireLockAndFile, the file should be locked but it wasn't", lockedFile.isLocked());
  lockedFile.relaseLock();
  assertFalse("After the call of relaseLock, the file shouldn't be locked but it's still locked", lockedFile.isLocked());
} catch (Exception e) {
  e.printStackTrace();
  fail("An error occurred on manipulating locking file:" +e.getMessage());
}
}

Now imagine the situation when we want to check that the second thread isn't able to acquire a lock for already locked file (locked by the first thread). Normally, this situation could be presented like that:

@Test
public void multiThreadingTest() {
  try {
    new Thread(new Runnable() {
      @Override
      public void run() {
	try {
	  LockedFile lockedFile = new LockedFile("/home/bartosz/tmp/estLocked.txt");
	  lockedFile.acquireLockAndFile();
	  assertTrue("After the call of acquireLockAndFile, the file should be locked but it wasn't", lockedFile.isLocked());
	  Thread.sleep(12000);
	  System.out.println("#1 Releasing a lock");
	  lockedFile.relaseLock();
	  assertFalse("After the call of relaseLock, the file shouldn't be locked but it's still locked", lockedFile.isLocked());
	} catch (Exception e) {
	  e.printStackTrace();
	}
      }
    }).start();
    new Thread(new Runnable() {
      @Override
      public void run() {
	LockedFile lockedFile = new LockedFile("/home/bartosz/tmp/testLocked.txt");
	try {
	  System.out.println("#2 Acquiring a lock");
	  lockedFile.acquireLockAndFile();
	  assertFalse("After the call of acquireLockAndFile, the file shouldn't be locked but it was", lockedFile.isLocked());
	} catch (Exception e) {
	  fail("Error on locking in 2e Thread:"+e.getMessage());
	  e.printStackTrace();
	}
      }
    }).start();
  } catch (Exception e) {
    e.printStackTrace();
    fail("An error occurred on manipulating locking file:" +e.getMessage());
  }
}

But as you can see, the code is executed upon 0.01 seconds and in the logs we can only find the entry from the second thread ("#2 Acquiring a lock"). So, how we could resolve this problematic situation ? We'll expose two potential solutions: one with CountDownLatch and another with ExecutorService. Both are parts of java.util.concurrent package.

JUnit multi-threading test with CountDownLatch

Let's start by CountDownLatch. You can deepen this subject by reading the article about CountDownLatch in Java. Here you only need to know that this class waits for the end of another thread's execution. The number of waiting threads is precised at construction time. Every thread calls CountDownLatch's method countDown() that decrements the count of latch. When all threads end theirs jobs, the main thread that initializes and calls CountDownLatch, continues to be executed. If you are a little bit confused, take a look on test case for our locking mechanism:

  try {
  LockedFile lockedFile = new LockedFile("/home/bartosz/tmp/testLocked.txt");
  lockedFile.acquireLockAndFile();
  assertTrue("After the call of acquireLockAndFile, the file should be locked but it wasn't", lockedFile.isLocked());
  final CountDownLatch latch = new CountDownLatch(1);
  final CallableResult result = new CallableResult();
  new Thread(new Runnable() {
    @Override
    public void run() {
      LockedFile lockedFile = new LockedFile("/home/bartosz/tmp/testLocked.txt");
      try {
	System.out.println("#2 Acquiring a lock");
	lockedFile.acquireLockAndFile();
	if (lockedFile.isLocked()) {
	  result.setMessage("After the call of acquireLockAndFile, the file shouldn't be locked but it was");
	  result.setInError(true);
	}
      } catch (Exception e) {
	result.setMessage("Error on locking in 2e Thread:"+e.getMessage());
	result.setInError(true);
	fail("Error on locking in 2e Thread:"+e.getMessage());
      }
      latch.countDown();
    }
  }).start();
  latch.await();
  assertFalse("Second thread shouldn't be in error, but it was:"+result.getMessage(), result.isFailure());
  lockedFile.relaseLock();
  assertFalse("After the call of relaseLock, the file shouldn't be locked but it's still locked", lockedFile.isLocked());
} catch (Exception e) {
  e.printStackTrace();
  fail("An error occurred on manipulating locking file:" +e.getMessage());
}

Here, the first operation consists on acquiring a lock. After, we define CountDownLatch and CallableResult instances. The second one will be explained in the last example of ExecutorService. Now, you can only know that it holds two fields (isError and message), thanks to it test case can detect a potential error in secondary thread. Otherwise, the main thread couldn't be able to see the assert* or fail methods called from the secondary thread. After we run the second thread and call, after the lock acquiring tries, a countDown method to inform that all threads finis theirs tasks. Now CountDownLatch from the main thread doesn't wait anymore and continue to execute the code after latch.await() method.

JUnit multi-threading test with ExecutorService

Another way to solve this problem in JUnit is the use of ExecutorService. In our case, it will generate a fixed-size thread pool (2 threads) and wait that all threads finish to execute. It's a very basic sample. To know more about it, please read an article about executor services in Java.

@Test
public void concurrentWithExecutorService() {
  ExecutorService executor = Executors.newFixedThreadPool(2);

  executor.execute(new Runnable() {
    @Override
    public void run() {
      try {
	LockedFile lockedFile = new LockedFile("/home/bartosz/tmp/testLocked.txt");
	lockedFile.acquireLockAndFile();
	assertTrue("After the call of acquireLockAndFile, the file should be locked but it wasn't", lockedFile.isLocked());
	Thread.sleep(15000);
	System.out.println("#1 Releasing a lock");
	lockedFile.relaseLock();
	assertFalse("After the call of relaseLock, the file shouldn't be locked but it's still locked", lockedFile.isLocked());
      } catch (Exception e) {
	e.printStackTrace();
      }
    }
  });
  executor.execute(new Runnable() {
    @Override
    public void run() {
      LockedFile lockedFile = new LockedFile("/home/bartosz/tmp/testLocked.txt");
      try {
	System.out.println("#2 Acquiring a lock");
	lockedFile.acquireLockAndFile();
	assertFalse("After the call of acquireLockAndFile, the file shouldn't be locked but it was", lockedFile.isLocked());
      } catch (Exception e) {
	fail("Error on locking in 2e Thread:"+e.getMessage());
	e.printStackTrace();
      }
    }
  });
  try {
    executor.awaitTermination(20, TimeUnit.SECONDS);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

As you can see, there are two threads defined in the first part. Both are executed simultaneously by executor. The executor controls the execution time thanks to awaitTermination method. It's waiting 20 second to both threads finish. To see the test fails, comment Thread.sleep(15000); and execute it. Yes, the test fails but JUnit marks it as successful. In fact, the error wasn't transmitted to main JUnit thread and stayed inside the second thread of ExecutorService (exactly than for CountDownLatch sample without CallableResult instance). You can see in the logs:

Exception in thread "pool-1-thread-2" java.lang.AssertionError: After the call of acquireLockAndFile, the file shouldn't be locked but it was
	at org.junit.Assert.fail(Assert.java:93)
	at org.junit.Assert.assertTrue(Assert.java:43)
	at org.junit.Assert.assertFalse(Assert.java:68)
	at com.mysite.test.FileSyncTest$2.run(FileSyncTest.java:51)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

This result isn't useful when, for example, we work with continuous integration. It's why we should to improve the code and pass the error into JUnit's main thread. To do that, we'll implement the concept of Future which represents the result of asynchronous operation. To know more about, please read an article about Future in Java. Now, let's take a look at the improved code:

@Test
public void concurrentWithExecutorService() {
  ExecutorService executor = Executors.newFixedThreadPool(2);
  Callable fileLockThread = new Callable() {
    @Override
    public CallableResult call() {
    try {
      LockedFile lockedFile = new LockedFile("/home/bartosz/testLocked.txt");
      lockedFile.acquireLockAndFile();
      if (!lockedFile.isLocked()) {
	return new CallableResult("T1) After the call of acquireLockAndFile, the file should be locked but it wasn't", true);
      }
      Thread.sleep(15000);
      System.out.println("T1) #1 Releasing a lock");
      lockedFile.relaseLock();
      if (lockedFile.isLocked()) {
	return new CallableResult("T1) After the call of relaseLock, the file shouldn't be locked but it's still locked", true);
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    return new CallableResult("T1) Test#1 OK", false);
    }
  };

  Callable noLockFileThraed = new Callable() {
    @Override
    public CallableResult call() {
      LockedFile lockedFile = new LockedFile("/home/bartosz/testLocked.txt");
      try {
	System.out.println("T2) #2 Acquiring a lock");
	lockedFile.acquireLockAndFile();
	if (lockedFile.isLocked()) {
	  return new CallableResult("T2) After the call of acquireLockAndFile, the file shouldn't be locked but it was", true);
	}
      } catch (Exception e) {
	fail("T2) Error on locking in 2e Thread:"+e.getMessage());
	e.printStackTrace();
      }
      return new CallableResult("T2) Test#2 OK", false);
    }
  };

  List> jobs = new ArrayList>();
  jobs.add(fileLockThread);
  jobs.add(noLockFileThraed);

  try {
    List> results = executor.invokeAll(jobs);
    for (Future result : results) {
      try {
	CallableResult callResult = result.get();
	assertFalse("CallableResult is in error with following message: "+callResult.getMessage(), callResult.isFailure());
	} catch (ExecutionException e) {
	e.printStackTrace();
	fail("CallableResult can't be retreived from the Future: "+result);
      }
    }
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

// ...
}


class CallableResult {
  private String message;
  private boolean inError;

  public CallableResult() {
  }

  public CallableResult(String message, boolean inError) {
    this.message = message;
    this.inError = inError;
  }
  
  public void setInError(boolean inError) {
    this.inError = inError;
  }

  public void setMessage(String message) {
    this.message = message;
  }

  public boolean isFailure() {
    return this.inError;
  }
  
  public String getMessage() {
    return this.message;
  }
}

As you can see, Runnable instance were replaced by Callable ones. The difference is that Callable instances return one result while the Runnable ones do not. So, all assert* methods were also replaced by CallableResult objects that contain a test message and a flag to mark one test as failed or successful. Each thread were encapsulated into List with all tasks to execute by ExecutorService. Its method invokeAll returns a list of results, one for each thread. After we need only to iterate through them and detect if the test in the separated thread is correct or not. In additionally, we don't need to care about test timeouts. ExecutorService knows itself how many tests remain to execute and if they are nothing, it returns the results automatically.

To make a multi-threading tests in JUnit framework, we need to be a little bit creative and use Java's concurrency techniques. In this article, we discovered that ExecutorService and CountDownLatch are well adapted to unit tests for multi-threading environment. The first one is more flexible and allow us to cut properly the test on: test definition (Callable instance), test execution (invokeAll) and test analyze (for-each loop with Future results). CountDownLatch appears also as a simple solution when we are waiting to another threads finis theirs jobs by decrementing a latch.


If you liked it, you should read:

đź“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!