Executor services in Java

Java 1.5 introduced a concept of executors, very useful for concurrent environments. Thanks to them we can easily create thread pools and even manipulate results generated by the threads. This article will cover this subject.

As usually, the first part of the article will present theoretical concepts of Executors framework. It'll explain the way of working but also all classed and interfaces that we should know before writing the code with executors. The next parts will present some use cases for executor works.

Executors in Java

The main concept going with executors are thread pools. They're a kind of working queue containing fixed or dynamic numbers of threads executed in parallel. Executor helps to manage them. Thanks to it, we can, for example, interrupt thread execution or initialize thread with fixed lifetime. In additionally, we can do this with the code that looks less verbose as if we made the same thing with native Thread objects.

Three main interfaces are hidden behind the concept of executors (are placed inside java.util.concurrent package:

They are 4 classes natively implementing theses interfaces:

After this introduction we can finally start to illustrate how ThreadPoolExecutor, ForkJoinPool and ScheduledThreadPoolExecutor work through code samples.

ThreadPoolExecutor

To test ThreadPoolExecutor features, we'll write a small test case. It'll be a dummy URL analyzer. But instead of reading the URLs, parsing the response and handling the errors, we'll only define a template of behavior to simulate different features of ThreadPoolExecutor. Through this test case, we'll see:
- max pool size and queue dependency
- task rejecting
- main task execution freezing with awaitTermination method
- shutdown

public class ThreadPoolExecutorTest {

	private static final String BASE_URL = "http://www.waitingforcode.com";
	
	@Test
	public void test() {
		 BlockingQueue workingQueue = new LinkedBlockingQueue(2);
		
		 String[] urls = {BASE_URL+"/java", BASE_URL+"/tips", 
				 BASE_URL+"/java-concurrency/countdownlatch-in-java/read", BASE_URL+/hibernate/locking-in-hibernate-jpa/read"
				 , BASE_URL+"/programming", BASE_URL+"/programming/character-encoding/read"
				 , BASE_URL+"/java-memory-model/string-memory-management-in-java/read", BASE_URL+/hibernate"
				 , BASE_URL+"/hibernate/difference-between-sessionfactory-and-entitymanagerfactory/read", BASE_URL+"/play-framework"
				 , BASE_URL+"/mysql"
		 };
		 long start = System.currentTimeMillis();
		 
		// executor with core pool size = 3, maximum pool size = 5, idle timeout of 5 seconds and queue with the 5 elements capacity
		ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 5, TimeUnit.SECONDS, workingQueue);
		// test for RejectedExecutionException and tasks added into queue
		// Our pool can accept 5 tasks [core pool size = 3, max pool size = 5] and the queue can contain 2 tasks.
		// Only one task, the first from the urls array, will be executed correctly. The others will sleep 17 seconds. In consequence, they won't execute because
		// executor is waiting only 5 seconds for all pooled threads.
		// To resume, 8 tasks will be executed or submitted to execution:
		// - 5 from the pool size (+1 because when the first task will complete, another one will be able to be submitted)
		// - 2 from the queue capacity
		boolean reeOccured = false;
		int notAdded = 0;
		for (int i = 0; i < urls.length; i++) {
			try {
				Runnable analyzer = new PageAnalyzer(urls[i], (i > 0));
				executor.execute(analyzer);
			} catch (RejectedExecutionException ree) {
				System.out.println("/!\\ Adding "+urls[i]+" (array position from 1 "+(i+1)+") to analyze failed (RejectedExecutionException) : "+ree.getMessage());
				notAdded++;
				reeOccured = true;
			}
		}
		assertTrue("RejectedExecutionException should occur but it wasn't", reeOccured);
		assertTrue("5 tasks shouldn't be added into thread pool but they're "+notAdded + " tasks not added instead", notAdded == 5);
		
		// awaitTermination indicates to the main thread to wait 5 seconds before continue to execute. 
		// If this time is to small, main thread can terminate before executor and some of submitted tasks to the pool
		// can not be executed.
		try {
			executor.awaitTermination(5, TimeUnit.SECONDS);
		} catch (Exception e) {
			e.printStackTrace();
		}
		long end = System.currentTimeMillis();
		long timeDiff = (end - start)/1000;
		assertTrue("Executor should be executed only during 5 seconds, but it was launched during "+timeDiff+ "seconds", timeDiff == 5);
		assertTrue("1 task should complete but they are "+executor.getCompletedTaskCount()+" completed tasks", executor.getCompletedTaskCount() == 1);

		// If you want to see pending tasks terminate, uncomment these lines. It will produce an InterruptedException :
		// "java.lang.InterruptedException: sleep interrupted
		// at java.lang.Thread.sleep(Native Method)
		// at com.mysite.test.concurrency.PageAnalyzer.run(ThreadPoolExecutorTest.java:93)
		// at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
		// at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
		// at java.lang.Thread.run(Unknown Source)"
		// and the print of ">> OK for ..." message
		// After we check which tasks weren't terminated. We can see that all tasks submitted to the queue weren't terminated,
		// as shutdownNow() method should do (according to Javadoc):
		// "Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of 
		// the tasks that were awaiting execution."
		// List notExecutedTasks = executor.shutdownNow();
		// assertTrue("Executor should be in 'shutdown' state now", executor.isShutdown());
		// assertTrue("All tasks submitted to the queue shouldn't be executed but they were ("+notExecutedTasks.size() + " weren't executed instead of 2",
		//  	notExecutedTasks.size() == 2);
		//  for (Runnable analyzer : notExecutedTasks) {
		// 	 System.out.println("They are some no executed task: "+analyzer);
		//  }
		
		System.out.println("Difference is: "+timeDiff);
		System.out.println("OK, all URLs were analyzed. We can now generate a response.");
	}

}


class PageAnalyzer implements Runnable {
	
	private String urlToAnalyze;
	private boolean mustSleep;
	
	public PageAnalyzer(String urlToAnalyze, boolean mustSleep) {
		this.urlToAnalyze = urlToAnalyze;
		this.mustSleep = mustSleep;
		System.out.println("> Submitting "+this.urlToAnalyze + " as URL to analyze");
	}

	@Override
	public void run() {
		if (this.mustSleep) {
			try {
				Thread.sleep(17000);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		System.out.println(">> OK for "+this.urlToAnalyze);
	}
	
	@Override
	public String toString() {
		return "PageAnalyzer {url: "+urlToAnalyze+", must sleep: "+mustSleep+"}";
	}
}

Comments placed inside the code show what happens.

ForkJoinPool

The illustration of this executor is based on order management process. In this case, we've 3 orders to treat, each composed with different products. One order is represented by OrderMaker instance and it must return the price for all bought products. Product price can be retrieved only through one ProductSearcher. We must optimize the search process by executing at most threads simultaneously as possible. So the idea of divide-and-conquer appears as a good solution for the problem.

public class ForkJoinPoolTest {

	@Test
	public void test() {
		ForkJoinPool pool = new ForkJoinPool();
		
		// 1st order to treat
		Products[] order1List = {Products.milk, Products.chocolate, Products.apple, Products.water, Products.pizza};
		List order1 = Arrays.asList(order1List);
		OrderMaker maker1 = new OrderMaker(order1, "Order#1");
		
		// 2nd order to treat
		Products[] order2List = {Products.milk, Products.coffee, Products.rice, Products.tuna};
		List order2 = Arrays.asList(order2List);
		OrderMaker maker2 = new OrderMaker(order2, "Order#2");


		// 3rd order to treat
		Products[] order3List = {Products.milk};
		List order3 = Arrays.asList(order3List);
		OrderMaker maker3 = new OrderMaker(order3, "Order#3");

		/*
		 * Here we can see how many process will be executed in parallel and what is the size of threads pool. Here, the size
		 * of the pool will be 0, even if getParallelism returns 2. In fact, we haven't submitted yet any task to execute, so
		 * it's normal that ForkJoinPool's size is 0.
		 */
		System.out.println("Pool paralallised with "+pool.getParallelism());
		System.out.println("Pool's size is: "+pool.getPoolSize());
		assertTrue("ForkJoinPool size should be 0 but was "+pool.getPoolSize(), pool.getPoolSize() == 0);
		
		/*
		 * Invoke the first order. Thanks to invoke() we can disable main thread execution until invoked tasks doesn't end.
		 */
		Double price1 = pool.invoke(maker1);
		System.out.println("Price for "+maker1+" is "+price1);
		/*
		 * We've already submitted one task to execute. Because we dispose 2 available process, getPoolSize() will return 2.
		 */
		System.out.println("Pool's size is :"+pool.getPoolSize());
		assertTrue("One task was submitted and pool's size should be greater than 0 but it wasn't", pool.getPoolSize() > 0);
		Double price2 = pool.invoke(maker2);
		System.out.println("Price for "+maker2+" is "+price2);
		System.out.println("Pool's size is :"+pool.getPoolSize());
		Double price3 = pool.invoke(maker3);
		System.out.println("Price for "+maker3+" is "+price3);
		System.out.println("Pool's size is :"+pool.getPoolSize());
		
		/*
		 * You'll see this print only when all invoked tasks terminate. We can deduce that invoke() method acts like a barrier for the
		 * main thread. The main thread will wait for all invocations end before continue.
		 */
		System.out.println("Print after all pooled thread execution");
		/*
		 * You could also submit the tasks through submit() method. However, the main thread won't wait
		 * for these tasks termination. Ie. we should see in the screen "Making order: Order#1" and the unit test
		 * terminate.
		 * Replacing submit() by execute() will have the same effect. Main thread will end before all submitted tasks
		 * end.
		 */
		// pool.submit(maker1);
		// pool.submit(maker2);
		// pool.submit(maker3);
		// pool.execute(maker1);
		// pool.execute(maker2);
		// pool.execute(maker3);
	}

}

/*
 * RecursiveTask is an abstract subclass of java.util.concurrent.ForkJoinTask. Unlike RecursiveAction, used in ProductSearcher,
 * RecursiveTask's compute method returns one value. RecursiveAction doesn't return it.
 */
class OrderMaker extends RecursiveTask {

	private static final long serialVersionUID = -405799312207251216L;
	private List productsToSearch;
	private String orderId;
	private double price;
	
	public OrderMaker(List productsToSearch, String orderId) {
		this.productsToSearch = productsToSearch;
		this.orderId = orderId;
	}
	
	public synchronized void sumPrice(double price) {
		this.price += price;
	}
	
	@Override
	protected Double compute() {
		System.out.println("Making order: "+this.orderId);
		List searchTasks = new ArrayList();
		for (Products product : productsToSearch) {
			RecursiveAction searcher = new ProductSearcher(product, this);
			searchTasks.add(searcher);
		}
		// It will invoke all tasks submitted in searchTasks list. In my case, 2 tasks will be invoked
		// in parallel (2 process available for the JVM). You can observe it by watching what is printed in the screen thanks
		// to Thread.sleep(3000) inside ProductSearcher's compute method.
		// You can also use ForkJoinPool.getParallelism() method to check how many process can be execute in parallel.
		invokeAll(searchTasks);
		return this.price;
	}
	
	@Override
	public String toString() {
		return this.orderId;
	}
	
}

/*
 * As mentioned earlier, RecursiveAction doesn't return any result (unlike RecursiveTask).
 */
class ProductSearcher extends RecursiveAction {

	private static final long serialVersionUID = 1L;
	private Products product;
	private OrderMaker order;
	
	public ProductSearcher(Products product, OrderMaker order) {
		this.product = product;
		this.order = order;
	}
	
	@Override
	protected void compute() {
		try {
			Thread.sleep(3000);
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.out.println("Computing search task "+this);
		order.sumPrice(this.product.getPrice());
	}
	
	@Override
	public String toString() {
		return "ProductSearcher {product: "+this.product+"}";
	}
	
}


enum Products {
	
	milk(1.0d),
	apple(1d),
	water(0.5d),
	pizza(3d),
	coffee(3.5d),
	rice(1.5d),
	tuna(1.5d),
	chocolate(2.3d);
	
	private double price;
	
	private Products(double price) {
		this.price = price;
	}
	
	public double getPrice() {
		return this.price;
	}
	
}

ForkJoinPool has also the methods to handle manually pool shutdown, as shutdown() or isShutdown().

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor'll be illustrated with the case of car checking system. Imagine that while you're driving, a car's system checks if all components work well. In our case, we'll check if GPS and car engine work correctly. But we can make these checks only a limited number of times (2) and only when the car is driving.

public class ScheduledThreadPoolExecutorTest {

	@Test
	public void test() {
		/*
		 * Notice that even if you specify low values for pool size, like 1, it won't disable the execution of supplementary threads.
		 * If you're looking at OpenJDK ScheduledThreadPool implementation (used in this test), http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7-b147/java/util/concurrent/ScheduledThreadPoolExecutor.java#ScheduledThreadPoolExecutor.delayedExecute%28java.util.concurrent.RunnableScheduledFuture%29,
		 * you'll see that the only case of task rejection can be pool shutdown:
		 * 
		 * if (isShutdown())
		 * reject(task);
		 * else {
		 * super.getQueue().add(task);
		 *  // ...
		 * 
* * So, even if you specify maximum pool size to 1 and you try to add more that 1 task, it won't throw RejectedExecutionException * as for ThreadPoolExecutor class. * * From the Javadoc we can learn that this behavior is normal, because ScheduledThreadPoolExecutor "acts as a * fixed-sized pool using corePoolSize threads and an unbounded queue, adjustments to maximumPoolSize have * no useful effect." */ ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1); scheduledPool.setMaximumPoolSize(1); scheduledPool.setRemoveOnCancelPolicy(true); scheduledPool.setKeepAliveTime(1, TimeUnit.SECONDS); scheduledPool.allowCoreThreadTimeOut(true); System.out.println("Max pool size is :"+scheduledPool.getMaximumPoolSize()); System.out.println("Queue sie is: "+scheduledPool.getQueue().size()); scheduledPool.execute(new Runnable() { @Override public void run() { System.out.println("Running anonymous class to see if it's considered as \"completed\""); } }); /* * CartStarter makes some preliminary checks to be sure that the car works well before driving. * The others checkers (GpsChecker and CarEngineChecker) work only when the car is driving. */ Future carStarted = scheduledPool.submit(new CarStarter()); // Future carStarted = scheduledPool.schedule(new CarStarter(), 0, TimeUnit.SECONDS); try { System.out.println("Active count is :"+scheduledPool.getActiveCount()); System.out.println("Check if car was started through "+carStarted); if (carStarted.get().booleanValue()) { carStarted.cancel(true); System.out.println("is cancelled ?"+carStarted.isCancelled()); System.out.println("is done ? "+carStarted.isDone()); System.out.println("Active count is :"+scheduledPool.getActiveCount()); /* * What does it mean when a task is completed ? They're 3 situations: * - one-shot tasks returning Future must be canceled (carStarted.cancel(true)) to be considered as * "completed". You can uncomment carStarted.cancel() to see that it's true. * - one-shot tasks which doesn't return Future instances (for example through execute()) method, are marked as * "completed" automatically, without any supplementary call. But sometimes they can be considered as "completed" with some * delay (it causes that this test fails sometimes; so it's better to use schedule() to be sure that one task is executed * before some other ones) * - scheduled tasks at fixed rate or with delay are treated in different way. Each task invocation is * considered as task "completed". So if for one scheduled task (for example: * scheduledPool.scheduleAtFixedRate(new CarEngineChecker(this), 0, 3, TimeUnit.SECONDS)) which run() method * is called 10 times, will be considered by ScheduledThreadPool as 10 "completed" tasks. */ assertTrue("They should be 2 completed tasks, but "+scheduledPool.getCompletedTaskCount()+" were completed", scheduledPool.getCompletedTaskCount() == 2); /* * We need to add a wrapper to check if all checkers were executed 2 times. We can't use ThreadPoolExecutor's * method to verify the active threads number (getActiveCount), completed tasks number (getCompletedTaskCount). The * first one returns always 1 while the second 6 or 7. So we can deduce that it considers all invocation of * scheduled task's run() method as a new completed task. * */ CarChecker carChecker = new CarChecker(scheduledPool); System.out.println(">>> Starting scheduled tasks at "+new Date()); carChecker.startCheckingSystem(); /* * It makes the main thread turns until scheduled pool isn't shutdown. This loop will fail on CarChecker.shutdownPool() * call. */ while (!scheduledPool.isTerminated()) { } if (carChecker.isInCheckError()) { fail("Hypothesis about completedTaskCount meaning was wrong !"); } System.out.println("All tasks end at "+new Date()); } } catch (Exception e) { e.printStackTrace(); } } } class CarChecker { private List> futures = new ArrayList>(); private int terminatedTasks = 0; private boolean inCheckError = false; private int completedExpected = 0; private int otherCompleted; private ScheduledThreadPoolExecutor scheduledPool; public CarChecker(ScheduledThreadPoolExecutor scheduledPool) { this.scheduledPool = scheduledPool; this.otherCompleted = (int) this.scheduledPool.getCompletedTaskCount(); System.out.println("otherCompleted ARE /"+this.otherCompleted); } public synchronized void addTerminated() { terminatedTasks++; } public synchronized void markAsCompleted() { int completedInPool = (int) scheduledPool.getCompletedTaskCount()-this.otherCompleted; if (completedExpected != completedInPool) { System.out.println("Expected completed tasks and completedTaskCount aren't the same: expected was "+ completedExpected + " and got "+completedInPool); inCheckError = true; } /* * Tasks are considered as "completed" at the end of run() method. So we increment completed expected tasks after comparison * with getCompletedTaskCount() and not before (in this case tasks calling markAsCompleted is not considered as "completed"). */ completedExpected++; } public boolean isInCheckError() { return inCheckError; } public void startCheckingSystem() { /* * scheduleAtFixedRate, schedule and scheduleWithFixedDelay return all ScheduledFuture instance. This instance * can be used to cancel scheduled task through its cancel() method. We can also check if it was execute (isDone()) or * already canceled (isCanceled()). * * So even if you cancel scheduled Runnable instance in some another way (for example by throwing an exception inside), they'll still * remain its ScheduledFuture. * * What is the difference between scheduleAtFixedRate and scheduleWithFixedDelay ? The second starts the tasks always between * the termination of one execution and the commencement of the next. For scheduleAtFixedRate the tasks are executed on submitted * delay. If the period is 3 seconds, the second tasks will start 3 seconds after the first, the third 6 seconds after the first and * so on. If one task takes more time than period, its execution may be postponed. */ futures.add(scheduledPool.scheduleAtFixedRate(new CarEngineChecker(this), 0, 3, TimeUnit.SECONDS)); futures.add(scheduledPool.scheduleWithFixedDelay(new GpsChecker(this), 2, 3, TimeUnit.SECONDS)); } public boolean canShutdown() { return terminatedTasks == futures.size(); } public void shutdownPool() { /* * You can also cancel futures one by one, when you need to do it. In this case we don't need to cancel the ScheduledFuture because * they'll be cancelled automatically after the call of shutdownNow(). But you can uncomment this fragment to see that. * * Additionally, you can add some throw new IllegalThreadStateException inside running tasks to see if the futures remain * alive after this event. Thanks to this exception you can observe another thing. The Future of task throwing this exception is * never considered as cancelled. To see that, play with commented IllegalThreadStateException inside below Runnable implementations. * To make it works, you must also switch the order of scheduled tasks in startCheckingSystem method (start to execute GPS check before * car engine and another time make the opposite). According to scheduling scenario, you should see printed for *
		 * futures.add(scheduledPool.scheduleAtFixedRate(new CarEngineChecker(this), 2, 3, TimeUnit.SECONDS));
		 * futures.add(scheduledPool.scheduleWithFixedDelay(new GpsChecker(this), 0, 3, TimeUnit.SECONDS));
		 * 
* : *
		 * Future is null ? java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@aa467d4
		 * Is cancelled ? true
		 * Future is null ? java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@2f9c94c1
		 * Is cancelled ? false
		 * 
* or for *
		 * futures.add(scheduledPool.scheduleAtFixedRate(new CarEngineChecker(this), 0, 3, TimeUnit.SECONDS));
		 * futures.add(scheduledPool.scheduleWithFixedDelay(new GpsChecker(this), 2, 3, TimeUnit.SECONDS));
		 * 
* : *
		 * Future is null ? java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@27bf7527
		 * Is cancelled ? false
		 * Future is null ? java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@aa467d4
		 * Is cancelled ? true
		 * 
*/ // for (ScheduledFuture future : futures) { // System.out.println("Future is null ? "+future); // if (!future.isCancelled()) { // future.cancel(true); // } // System.out.println("Is cancelled ? "+future.isCancelled()); // } System.out.println("> "+scheduledPool.getActiveCount()); scheduledPool.shutdownNow(); } } class CarStarter implements Callable { @Override public Boolean call() throws Exception { System.out.println("After some checks, we can start to drive this car"); return true; } } class CarEngineChecker implements Runnable { private int checks = 0; private CarChecker checker; public CarEngineChecker(CarChecker checker) { this.checker = checker; } @Override public void run() { checker.markAsCompleted(); System.out.println("Checking CarEngineChecker..."); checks++; if (checks > 2) { checker.addTerminated(); throw new IllegalThreadStateException("IllegalThreadState to check if the Future remains alive"); // if (checker.canShutdown()) { // checker.shutdownPool(); // } } else { System.out.println("Checking if engine is working at "+new Date()); System.out.println("Car engine checked "+(checks)+" times"); } } } class GpsChecker implements Runnable { private CarChecker checker; private int checks = 0; public GpsChecker(CarChecker checker) { this.checker = checker; } @Override public void run() { checker.markAsCompleted(); System.out.println("Checking GpsChecker..."); checks++; if (checks > 2) { checker.addTerminated(); // throw new IllegalThreadStateException("IllegalThreadState to check if the Future remains alive"); if (checker.canShutdown()) { checker.shutdownPool(); } } else { System.out.println("Checking if GPS is working correctly at "+new Date()); System.out.println("GPS checked "+(checks)+" times"); } } }

This article shows how to deal with thread pools with Java's executors. Thanks to them we can easily manage the size of the pool (ThreadPoolExecutor) and improve memory use by dividing big tasks on many little ones (ForkJoinPool). In additionally we can also work with periodic tasks, for example, submitted once and executed with given period (ScheduledThreadPoolExecutor). But sometimes the features between different executors aren't the same and you can, for example, be in trouble when implementing max pool size in ScheduledThreadPoolExecutor.

If you liked it, you should read: