Java 1.5 introduced a concept of executors, very useful for concurrent environments. Thanks to them we can easily create thread pools and even manipulate results generated by the threads. This article will cover this subject.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
As usually, the first part of the article will present theoretical concepts of Executors framework. It'll explain the way of working but also all classed and interfaces that we should know before writing the code with executors. The next parts will present some use cases for executor works.
Executors in Java
The main concept going with executors are thread pools. They're a kind of working queue containing fixed or dynamic numbers of threads executed in parallel. Executor helps to manage them. Thanks to it, we can, for example, interrupt thread execution or initialize thread with fixed lifetime. In additionally, we can do this with the code that looks less verbose as if we made the same thing with native Thread objects.
Three main interfaces are hidden behind the concept of executors (are placed inside java.util.concurrent package:
- Executor basic interface that supports only the launching of new tasks. It contains only execute method. It executes an implementation of Runnable interface passed in parameter.
- ExecutorService interface that extends Executor and adds some supplementary features as management of threads lifecycle. This interface should be used when we want to treat the result of thread tasks through Future objects. The submit method of ExecutorService is like simple execute method, but in additionally, it accepts Callable instances and returns Future instance after thread execution end. ExecutorService can also shutdown threads execution.
- ScheduledExecutorService subinterface of ExecutorService that allows the work with scheduled and periodic tasks. Its supplementary methods, scheduleAtFixedRate, scheduleWithFixedDelay or schedule, can be used to execute periodic tasks or create tasks that must be executed after specific time.
They are 4 classes natively implementing theses interfaces:
- AbstractExecutorService abstract class extended by ForkJoinPool and ThreadPoolExecutor. It defines a set of supplementary methods to hold the invocation of multiple threads (invokeAll) and to add new task to execute (submit).
- ForkJoinPool used to run java.util.concurrent.ForkJoinTask. It doesn't limit the size of submitted tasks but it's not the only difference. By definition, fork/join is a representation of divide-and-conquer algorithm where a big tasks are divided into small ones (if necessary). They're executed in separated threads. The result of all threads is join at the end.
ForkJoinPool constructor is based on parallelism concept, ie. the parameter indicates how many processors should be used in parallel to treat provided tasks. By default, all processors available for the JVM are used. The number of available processors can be taken from availableProcessors method of java.lang.Runtime class. - ThreadPoolExecutor executes thread tasks using one of its internally pooled threads. Two important parameters are used by it at construction time: core pool size and maximum pool size. To understand well the meaning of both, we need to know that all ThreadPoolExecutor's constructors take another obligatory parameter, an instance of java.util.concurrent.BlockingQueue
.
Imagine the configuration with core pool size equals to 10 and max pool size to 20. Your BlockingQueue accepts only 50 elements. Now, regarding to scenario, ThreadPoolExecutor will behave differently:
- you submitted 10 threads to execute. They're all added into core pool size. Now, you want to add next 75 threads. The first 50 are appending to the queue. 25 tasks remain to treat. Because max pool size is greater than core pool size, the're 10 places to take by remaining threads. After reaching the maximum pool size, 15 threads can't be treated. They're all rejected and RejectedExecutionException is thrown.
- if your core pool size and max pool size are equal, you'll clearly create a fixed-pool size. The same exception occurs when the queue and pool are full. - ScheduledThreadPoolExecutor kind of "smart" thread pool executor that can launch the tasks at given time or even execute the tasks periodically (ex: every 10 minutes). Unlike ThreadPoolExecutor, it accepts only the core pool size parameter. If one task can't be scheduled to execution (for example when they're no place in the pool), RejectedExecutionException exceptions is thrown.
One conceptual difference is important. The tasks aren't submitted into ScheduledThreadPoolExecutor, but scheduled. So, the tasks are added as disabled. They become enabled after the delay specified in the parameter of schedule method. Notice that we can also execute the tasks immediatelay, through standard execute methods. Theses tasks will in fact be executed at the same way as in schedule method with delay value equals to 0.
After this introduction we can finally start to illustrate how ThreadPoolExecutor, ForkJoinPool and ScheduledThreadPoolExecutor work through code samples.
ThreadPoolExecutor
To test ThreadPoolExecutor features, we'll write a small test case. It'll be a dummy URL analyzer. But instead of reading the URLs, parsing the response and handling the errors, we'll only define a template of behavior to simulate different features of ThreadPoolExecutor. Through this test case, we'll see:
- max pool size and queue dependency
- task rejecting
- main task execution freezing with awaitTermination method
- shutdown
public class ThreadPoolExecutorTest { private static final String BASE_URL = "http://www.waitingforcode.com"; @Test public void test() { BlockingQueueworkingQueue = new LinkedBlockingQueue (2); String[] urls = {BASE_URL+"/java", BASE_URL+"/tips", BASE_URL+"/java-concurrency/countdownlatch-in-java/read", BASE_URL+/hibernate/locking-in-hibernate-jpa/read" , BASE_URL+"/programming", BASE_URL+"/programming/character-encoding/read" , BASE_URL+"/java-memory-model/string-memory-management-in-java/read", BASE_URL+/hibernate" , BASE_URL+"/hibernate/difference-between-sessionfactory-and-entitymanagerfactory/read", BASE_URL+"/play-framework" , BASE_URL+"/mysql" }; long start = System.currentTimeMillis(); // executor with core pool size = 3, maximum pool size = 5, idle timeout of 5 seconds and queue with the 5 elements capacity ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 5, TimeUnit.SECONDS, workingQueue); // test for RejectedExecutionException and tasks added into queue // Our pool can accept 5 tasks [core pool size = 3, max pool size = 5] and the queue can contain 2 tasks. // Only one task, the first from the urls array, will be executed correctly. The others will sleep 17 seconds. In consequence, they won't execute because // executor is waiting only 5 seconds for all pooled threads. // To resume, 8 tasks will be executed or submitted to execution: // - 5 from the pool size (+1 because when the first task will complete, another one will be able to be submitted) // - 2 from the queue capacity boolean reeOccured = false; int notAdded = 0; for (int i = 0; i < urls.length; i++) { try { Runnable analyzer = new PageAnalyzer(urls[i], (i > 0)); executor.execute(analyzer); } catch (RejectedExecutionException ree) { System.out.println("/!\\ Adding "+urls[i]+" (array position from 1 "+(i+1)+") to analyze failed (RejectedExecutionException) : "+ree.getMessage()); notAdded++; reeOccured = true; } } assertTrue("RejectedExecutionException should occur but it wasn't", reeOccured); assertTrue("5 tasks shouldn't be added into thread pool but they're "+notAdded + " tasks not added instead", notAdded == 5); // awaitTermination indicates to the main thread to wait 5 seconds before continue to execute. // If this time is to small, main thread can terminate before executor and some of submitted tasks to the pool // can not be executed. try { executor.awaitTermination(5, TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); } long end = System.currentTimeMillis(); long timeDiff = (end - start)/1000; assertTrue("Executor should be executed only during 5 seconds, but it was launched during "+timeDiff+ "seconds", timeDiff == 5); assertTrue("1 task should complete but they are "+executor.getCompletedTaskCount()+" completed tasks", executor.getCompletedTaskCount() == 1); // If you want to see pending tasks terminate, uncomment these lines. It will produce an InterruptedException : // "java.lang.InterruptedException: sleep interrupted // at java.lang.Thread.sleep(Native Method) // at com.mysite.test.concurrency.PageAnalyzer.run(ThreadPoolExecutorTest.java:93) // at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source) // at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) // at java.lang.Thread.run(Unknown Source)" // and the print of ">> OK for ..." message // After we check which tasks weren't terminated. We can see that all tasks submitted to the queue weren't terminated, // as shutdownNow() method should do (according to Javadoc): // "Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of // the tasks that were awaiting execution." // List notExecutedTasks = executor.shutdownNow(); // assertTrue("Executor should be in 'shutdown' state now", executor.isShutdown()); // assertTrue("All tasks submitted to the queue shouldn't be executed but they were ("+notExecutedTasks.size() + " weren't executed instead of 2", // notExecutedTasks.size() == 2); // for (Runnable analyzer : notExecutedTasks) { // System.out.println("They are some no executed task: "+analyzer); // } System.out.println("Difference is: "+timeDiff); System.out.println("OK, all URLs were analyzed. We can now generate a response."); } } class PageAnalyzer implements Runnable { private String urlToAnalyze; private boolean mustSleep; public PageAnalyzer(String urlToAnalyze, boolean mustSleep) { this.urlToAnalyze = urlToAnalyze; this.mustSleep = mustSleep; System.out.println("> Submitting "+this.urlToAnalyze + " as URL to analyze"); } @Override public void run() { if (this.mustSleep) { try { Thread.sleep(17000); } catch (Exception e) { e.printStackTrace(); } } System.out.println(">> OK for "+this.urlToAnalyze); } @Override public String toString() { return "PageAnalyzer {url: "+urlToAnalyze+", must sleep: "+mustSleep+"}"; } }
Comments placed inside the code show what happens.
ForkJoinPool
The illustration of this executor is based on order management process. In this case, we've 3 orders to treat, each composed with different products. One order is represented by OrderMaker instance and it must return the price for all bought products. Product price can be retrieved only through one ProductSearcher. We must optimize the search process by executing at most threads simultaneously as possible. So the idea of divide-and-conquer appears as a good solution for the problem.
public class ForkJoinPoolTest { @Test public void test() { ForkJoinPool pool = new ForkJoinPool(); // 1st order to treat Products[] order1List = {Products.milk, Products.chocolate, Products.apple, Products.water, Products.pizza}; Listorder1 = Arrays.asList(order1List); OrderMaker maker1 = new OrderMaker(order1, "Order#1"); // 2nd order to treat Products[] order2List = {Products.milk, Products.coffee, Products.rice, Products.tuna}; List order2 = Arrays.asList(order2List); OrderMaker maker2 = new OrderMaker(order2, "Order#2"); // 3rd order to treat Products[] order3List = {Products.milk}; List order3 = Arrays.asList(order3List); OrderMaker maker3 = new OrderMaker(order3, "Order#3"); /* * Here we can see how many process will be executed in parallel and what is the size of threads pool. Here, the size * of the pool will be 0, even if getParallelism returns 2. In fact, we haven't submitted yet any task to execute, so * it's normal that ForkJoinPool's size is 0. */ System.out.println("Pool paralallised with "+pool.getParallelism()); System.out.println("Pool's size is: "+pool.getPoolSize()); assertTrue("ForkJoinPool size should be 0 but was "+pool.getPoolSize(), pool.getPoolSize() == 0); /* * Invoke the first order. Thanks to invoke() we can disable main thread execution until invoked tasks doesn't end. */ Double price1 = pool.invoke(maker1); System.out.println("Price for "+maker1+" is "+price1); /* * We've already submitted one task to execute. Because we dispose 2 available process, getPoolSize() will return 2. */ System.out.println("Pool's size is :"+pool.getPoolSize()); assertTrue("One task was submitted and pool's size should be greater than 0 but it wasn't", pool.getPoolSize() > 0); Double price2 = pool.invoke(maker2); System.out.println("Price for "+maker2+" is "+price2); System.out.println("Pool's size is :"+pool.getPoolSize()); Double price3 = pool.invoke(maker3); System.out.println("Price for "+maker3+" is "+price3); System.out.println("Pool's size is :"+pool.getPoolSize()); /* * You'll see this print only when all invoked tasks terminate. We can deduce that invoke() method acts like a barrier for the * main thread. The main thread will wait for all invocations end before continue. */ System.out.println("Print after all pooled thread execution"); /* * You could also submit the tasks through submit() method. However, the main thread won't wait * for these tasks termination. Ie. we should see in the screen "Making order: Order#1" and the unit test * terminate. * Replacing submit() by execute() will have the same effect. Main thread will end before all submitted tasks * end. */ // pool.submit(maker1); // pool.submit(maker2); // pool.submit(maker3); // pool.execute(maker1); // pool.execute(maker2); // pool.execute(maker3); } } /* * RecursiveTask is an abstract subclass of java.util.concurrent.ForkJoinTask. Unlike RecursiveAction, used in ProductSearcher, * RecursiveTask's compute method returns one value. RecursiveAction doesn't return it. */ class OrderMaker extends RecursiveTask { private static final long serialVersionUID = -405799312207251216L; private List productsToSearch; private String orderId; private double price; public OrderMaker(List productsToSearch, String orderId) { this.productsToSearch = productsToSearch; this.orderId = orderId; } public synchronized void sumPrice(double price) { this.price += price; } @Override protected Double compute() { System.out.println("Making order: "+this.orderId); List searchTasks = new ArrayList (); for (Products product : productsToSearch) { RecursiveAction searcher = new ProductSearcher(product, this); searchTasks.add(searcher); } // It will invoke all tasks submitted in searchTasks list. In my case, 2 tasks will be invoked // in parallel (2 process available for the JVM). You can observe it by watching what is printed in the screen thanks // to Thread.sleep(3000) inside ProductSearcher's compute method. // You can also use ForkJoinPool.getParallelism() method to check how many process can be execute in parallel. invokeAll(searchTasks); return this.price; } @Override public String toString() { return this.orderId; } } /* * As mentioned earlier, RecursiveAction doesn't return any result (unlike RecursiveTask). */ class ProductSearcher extends RecursiveAction { private static final long serialVersionUID = 1L; private Products product; private OrderMaker order; public ProductSearcher(Products product, OrderMaker order) { this.product = product; this.order = order; } @Override protected void compute() { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } System.out.println("Computing search task "+this); order.sumPrice(this.product.getPrice()); } @Override public String toString() { return "ProductSearcher {product: "+this.product+"}"; } } enum Products { milk(1.0d), apple(1d), water(0.5d), pizza(3d), coffee(3.5d), rice(1.5d), tuna(1.5d), chocolate(2.3d); private double price; private Products(double price) { this.price = price; } public double getPrice() { return this.price; } }
ForkJoinPool has also the methods to handle manually pool shutdown, as shutdown() or isShutdown().
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor'll be illustrated with the case of car checking system. Imagine that while you're driving, a car's system checks if all components work well. In our case, we'll check if GPS and car engine work correctly. But we can make these checks only a limited number of times (2) and only when the car is driving.
public class ScheduledThreadPoolExecutorTest { @Test public void test() { /* * Notice that even if you specify low values for pool size, like 1, it won't disable the execution of supplementary threads. * If you're looking at OpenJDK ScheduledThreadPool implementation (used in this test), http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7-b147/java/util/concurrent/ScheduledThreadPoolExecutor.java#ScheduledThreadPoolExecutor.delayedExecute%28java.util.concurrent.RunnableScheduledFuture%29, * you'll see that the only case of task rejection can be pool shutdown: ** if (isShutdown()) * reject(task); * else { * super.getQueue().add(task); * // ... ** * So, even if you specify maximum pool size to 1 and you try to add more that 1 task, it won't throw RejectedExecutionException * as for ThreadPoolExecutor class. * * From the Javadoc we can learn that this behavior is normal, because ScheduledThreadPoolExecutor "acts as a * fixed-sized pool using corePoolSize threads and an unbounded queue, adjustments to maximumPoolSize have * no useful effect." */ ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1); scheduledPool.setMaximumPoolSize(1); scheduledPool.setRemoveOnCancelPolicy(true); scheduledPool.setKeepAliveTime(1, TimeUnit.SECONDS); scheduledPool.allowCoreThreadTimeOut(true); System.out.println("Max pool size is :"+scheduledPool.getMaximumPoolSize()); System.out.println("Queue sie is: "+scheduledPool.getQueue().size()); scheduledPool.execute(new Runnable() { @Override public void run() { System.out.println("Running anonymous class to see if it's considered as \"completed\""); } }); /* * CartStarter makes some preliminary checks to be sure that the car works well before driving. * The others checkers (GpsChecker and CarEngineChecker) work only when the car is driving. */ FuturecarStarted = scheduledPool.submit(new CarStarter()); // Future carStarted = scheduledPool.schedule(new CarStarter(), 0, TimeUnit.SECONDS); try { System.out.println("Active count is :"+scheduledPool.getActiveCount()); System.out.println("Check if car was started through "+carStarted); if (carStarted.get().booleanValue()) { carStarted.cancel(true); System.out.println("is cancelled ?"+carStarted.isCancelled()); System.out.println("is done ? "+carStarted.isDone()); System.out.println("Active count is :"+scheduledPool.getActiveCount()); /* * What does it mean when a task is completed ? They're 3 situations: * - one-shot tasks returning Future must be canceled (carStarted.cancel(true)) to be considered as * "completed". You can uncomment carStarted.cancel() to see that it's true. * - one-shot tasks which doesn't return Future instances (for example through execute()) method, are marked as * "completed" automatically, without any supplementary call. But sometimes they can be considered as "completed" with some * delay (it causes that this test fails sometimes; so it's better to use schedule() to be sure that one task is executed * before some other ones) * - scheduled tasks at fixed rate or with delay are treated in different way. Each task invocation is * considered as task "completed". So if for one scheduled task (for example: * scheduledPool.scheduleAtFixedRate(new CarEngineChecker(this), 0, 3, TimeUnit.SECONDS)) which run() method * is called 10 times, will be considered by ScheduledThreadPool as 10 "completed" tasks. */ assertTrue("They should be 2 completed tasks, but "+scheduledPool.getCompletedTaskCount()+" were completed", scheduledPool.getCompletedTaskCount() == 2); /* * We need to add a wrapper to check if all checkers were executed 2 times. We can't use ThreadPoolExecutor's * method to verify the active threads number (getActiveCount), completed tasks number (getCompletedTaskCount). The * first one returns always 1 while the second 6 or 7. So we can deduce that it considers all invocation of * scheduled task's run() method as a new completed task. * */ CarChecker carChecker = new CarChecker(scheduledPool); System.out.println(">>> Starting scheduled tasks at "+new Date()); carChecker.startCheckingSystem(); /* * It makes the main thread turns until scheduled pool isn't shutdown. This loop will fail on CarChecker.shutdownPool() * call. */ while (!scheduledPool.isTerminated()) { } if (carChecker.isInCheckError()) { fail("Hypothesis about completedTaskCount meaning was wrong !"); } System.out.println("All tasks end at "+new Date()); } } catch (Exception e) { e.printStackTrace(); } } } class CarChecker { private List > futures = new ArrayList >(); private int terminatedTasks = 0; private boolean inCheckError = false; private int completedExpected = 0; private int otherCompleted; private ScheduledThreadPoolExecutor scheduledPool; public CarChecker(ScheduledThreadPoolExecutor scheduledPool) { this.scheduledPool = scheduledPool; this.otherCompleted = (int) this.scheduledPool.getCompletedTaskCount(); System.out.println("otherCompleted ARE /"+this.otherCompleted); } public synchronized void addTerminated() { terminatedTasks++; } public synchronized void markAsCompleted() { int completedInPool = (int) scheduledPool.getCompletedTaskCount()-this.otherCompleted; if (completedExpected != completedInPool) { System.out.println("Expected completed tasks and completedTaskCount aren't the same: expected was "+ completedExpected + " and got "+completedInPool); inCheckError = true; } /* * Tasks are considered as "completed" at the end of run() method. So we increment completed expected tasks after comparison * with getCompletedTaskCount() and not before (in this case tasks calling markAsCompleted is not considered as "completed"). */ completedExpected++; } public boolean isInCheckError() { return inCheckError; } public void startCheckingSystem() { /* * scheduleAtFixedRate, schedule and scheduleWithFixedDelay return all ScheduledFuture> instance. This instance * can be used to cancel scheduled task through its cancel() method. We can also check if it was execute (isDone()) or * already canceled (isCanceled()). * * So even if you cancel scheduled Runnable instance in some another way (for example by throwing an exception inside), they'll still * remain its ScheduledFuture. * * What is the difference between scheduleAtFixedRate and scheduleWithFixedDelay ? The second starts the tasks always between * the termination of one execution and the commencement of the next. For scheduleAtFixedRate the tasks are executed on submitted * delay. If the period is 3 seconds, the second tasks will start 3 seconds after the first, the third 6 seconds after the first and * so on. If one task takes more time than period, its execution may be postponed. */ futures.add(scheduledPool.scheduleAtFixedRate(new CarEngineChecker(this), 0, 3, TimeUnit.SECONDS)); futures.add(scheduledPool.scheduleWithFixedDelay(new GpsChecker(this), 2, 3, TimeUnit.SECONDS)); } public boolean canShutdown() { return terminatedTasks == futures.size(); } public void shutdownPool() { /* * You can also cancel futures one by one, when you need to do it. In this case we don't need to cancel the ScheduledFuture because * they'll be cancelled automatically after the call of shutdownNow(). But you can uncomment this fragment to see that. * * Additionally, you can add some throw new IllegalThreadStateException inside running tasks to see if the futures remain * alive after this event. Thanks to this exception you can observe another thing. The Future of task throwing this exception is * never considered as cancelled. To see that, play with commented IllegalThreadStateException inside below Runnable implementations. * To make it works, you must also switch the order of scheduled tasks in startCheckingSystem method (start to execute GPS check before * car engine and another time make the opposite). According to scheduling scenario, you should see printed for * * futures.add(scheduledPool.scheduleAtFixedRate(new CarEngineChecker(this), 2, 3, TimeUnit.SECONDS)); * futures.add(scheduledPool.scheduleWithFixedDelay(new GpsChecker(this), 0, 3, TimeUnit.SECONDS)); ** : ** Future is null ? java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@aa467d4 * Is cancelled ? true * Future is null ? java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@2f9c94c1 * Is cancelled ? false ** or for ** futures.add(scheduledPool.scheduleAtFixedRate(new CarEngineChecker(this), 0, 3, TimeUnit.SECONDS)); * futures.add(scheduledPool.scheduleWithFixedDelay(new GpsChecker(this), 2, 3, TimeUnit.SECONDS)); ** : ** Future is null ? java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@27bf7527 * Is cancelled ? false * Future is null ? java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@aa467d4 * Is cancelled ? true **/ // for (ScheduledFuture> future : futures) { // System.out.println("Future is null ? "+future); // if (!future.isCancelled()) { // future.cancel(true); // } // System.out.println("Is cancelled ? "+future.isCancelled()); // } System.out.println("> "+scheduledPool.getActiveCount()); scheduledPool.shutdownNow(); } } class CarStarter implements Callable{ @Override public Boolean call() throws Exception { System.out.println("After some checks, we can start to drive this car"); return true; } } class CarEngineChecker implements Runnable { private int checks = 0; private CarChecker checker; public CarEngineChecker(CarChecker checker) { this.checker = checker; } @Override public void run() { checker.markAsCompleted(); System.out.println("Checking CarEngineChecker..."); checks++; if (checks > 2) { checker.addTerminated(); throw new IllegalThreadStateException("IllegalThreadState to check if the Future remains alive"); // if (checker.canShutdown()) { // checker.shutdownPool(); // } } else { System.out.println("Checking if engine is working at "+new Date()); System.out.println("Car engine checked "+(checks)+" times"); } } } class GpsChecker implements Runnable { private CarChecker checker; private int checks = 0; public GpsChecker(CarChecker checker) { this.checker = checker; } @Override public void run() { checker.markAsCompleted(); System.out.println("Checking GpsChecker..."); checks++; if (checks > 2) { checker.addTerminated(); // throw new IllegalThreadStateException("IllegalThreadState to check if the Future remains alive"); if (checker.canShutdown()) { checker.shutdownPool(); } } else { System.out.println("Checking if GPS is working correctly at "+new Date()); System.out.println("GPS checked "+(checks)+" times"); } } }
This article shows how to deal with thread pools with Java's executors. Thanks to them we can easily manage the size of the pool (ThreadPoolExecutor) and improve memory use by dividing big tasks on many little ones (ForkJoinPool). In additionally we can also work with periodic tasks, for example, submitted once and executed with given period (ScheduledThreadPoolExecutor). But sometimes the features between different executors aren't the same and you can, for example, be in trouble when implementing max pool size in ScheduledThreadPoolExecutor.