With Java 8 comes a complementary class to classical Future objects. This class not only helps to work with promises but also makes possible to get future results in non-blocking way.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
Through this article we'll explore this new feature called CompletableFuture. In the first part of the article, we'll focus on CompletableFuture composition. However, we won't list each of available methods because there are numerous. Instead of it, we'll list the main characteristicts and describe each of them shortly. The second part of the article will contain some test cases which should help to understand how to use CompletableFuture in real code.
What is CompletableFuture ?
java.util.concurrent.CompletableFuture was introduced in Java 8 as one of responses to growing demand to construct asynchronous and non-blocking programs. Because unlike usual Future object, main thread doesn't need to wait future jobs before continue. It can as well make its work and handle promise result only when available. For use-case we could imagine the program preparing some group of data in main thread (for example: by getting it from database) and the rest in separate threads (for example: by making some asynchronous calls to web services not influencing rendered page). Let's see that in the list describing CompletableFuture characteristics:
- non-blocking: as described previously, main thread that creates CompletableFuture can continue to work even if the result of the CompletableFuture is not computed. It's done only when the main thread doesn't call neither get() nor join() methods.
- thread independent: the result of CompletableFuture can be defined inside the executed task or outside of it. We can do so by calling complete(T) method from, for example, a thread executing task which result is used by given CompletableFuture.
- event-driven approach: CompletableFuture gives the possibility to react on produced event (for example: when an error occurred on executing asynchronous task). Thanks to listener defined in whenComplete(BiConsumer) method, CompletableFuture can notify one or several other objects about tasks completion or failure.
- fluent interfaces: 45 of CompletableFuture methods return another CompletableFuture object. Some of them are used in method chaining where each call creates a new objects directly from previous one(s). As an example of that, we could quote:
- methods to combine several tasks (thenCombine)
- methods to deal with the result of previous task (thenAccept)
- methods applying some behavior to tasks (apply*)
- or finally methods running some code after the task (runAfter*) - common thread pool: to work with CompletableFuture we don't need to manage thread pool manually. Some of CompletableFuture async methods have a parameter refering to explicit Executor. However, there are also the methods without this reference which use the common pool instance (ForkJoinPool.commonPool).
As we can see through these different points, CompletableFuture matches well with the trend of reactive programming. Thanks to its event-driven approach, non-blocking way of work, CompletableFuture can be used with another asynchronous tools to develop reactive applications.
CompletableFuture example
But before concluding, let's play a little with CompletableFuture. As usual, this time also we'll write some unit tests to show some of principal aspects of this new future: non-blocking, event-driven, thread independent and chaining:
public class CompletableFutureTest { @Test public void should_correctly_download_images_in_chaining() throws InterruptedException, ExecutionException { long startTime = System.currentTimeMillis(); FileDownloader downloader4 = new FileDownloader(4); FileDownloader downloader8 = new FileDownloader(8); final Boolean[] runStat = new Boolean[] {false}; CompletableFuture<Boolean> pageDownloaded = CompletableFuture.supplyAsync(downloader4::download) .thenCombine(CompletableFuture.supplyAsync(downloader8::download), (result1, result2) -> result1 && result2); pageDownloaded.thenRun(() -> { // Here we can run a single task after completing two combined CompletableFutures runStat[0] = true; }); assertThat(pageDownloaded.get()).isTrue(); // Because of get() call, this operation will be blocker and should take 4 seconds (8/2 = 4; 4*1000=4000 ms) long endTime = System.currentTimeMillis(); assertThat((endTime - startTime)/1000).isEqualTo(4L); assertThat(runStat[0]).isTrue(); } @Test public void should_detect_join_method_as_blocking() { long startTime = System.currentTimeMillis(); FileDownloader downloader6 = new FileDownloader(6); CompletableFuture<Boolean> pageDownloaded = CompletableFuture.supplyAsync(downloader6::download); // join(), as get(), is a blocking operation pageDownloaded.join(); long endTime = System.currentTimeMillis(); assertThat((endTime - startTime)/1000).isEqualTo(3L); } @Test public void should_allow_parent_thread_to_continue_with_event_driven_approach() throws InterruptedException { List<RunningStats> stats = new ArrayList<>(); getAsyncFileDownloader(2000L, stats); getAsyncFileDownloader(2000L, stats); Thread.sleep(1); // 1ms for security to avoid equality between tasks start time and time after tasks creation final long timeAfterCreatingTasks = System.currentTimeMillis(); /** * This operations is not blocker. It means that without any of other synchronization mechanism, * main thread will and we won't able to see prints from CompletableFutures. * It's why we use primitive 3 seconds sleep. */ Thread.sleep(3000); stats.forEach((stat) -> { /** * These tests prove that CompletableFuture used with complete() instead of get() * call is non-blocking for the main thread. The main thread can continue * to execute its job without waiting for CompletableFutures to finish theirs tasks. * So to resume, the tasks must begin before timer and they must finish * 2 seconds after it. */ assertThat(timeAfterCreatingTasks).isGreaterThan(stat.getStartTime()); assertThat(timeAfterCreatingTasks).isLessThan(stat.getEndTime()); assertThat(stat.isListenerCalled()).isTrue(); }); } private CompletableFuture<Boolean> getAsyncFileDownloader(long sleepTime, List<RunningStats> statsList) { final RunningStats stats = new RunningStats(); statsList.add(stats); final CompletableFuture<Boolean> completableFuture = new CompletableFuture<>(); Runnable task = () -> { stats.setStartTime(System.currentTimeMillis()); try { Thread.sleep(sleepTime); } catch (InterruptedException ie) { completableFuture.completeExceptionally(ie); } stats.setEndTime(System.currentTimeMillis()); completableFuture.complete(true); }; new Thread(task).start(); completableFuture.whenComplete((result, exception) -> { stats.setListenerCalled(true); }); return completableFuture; } private static final class FileDownloader { private static final int MB_PER_SEC = 2; private final int sizeToDownload; public FileDownloader(int sizeToDownload) { this.sizeToDownload = sizeToDownload; } public Boolean download() { int chunks = sizeToDownload/MB_PER_SEC; for (int chunk = 0; chunk < chunks; chunk++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } return sizeToDownload%2 == 0; } } private static final class RunningStats { private long startTime; private long endTime; private boolean listenerCalled; // getters and setters omitted } }
Even if Java 8 has been released one year ago, understand CompletableFuture may be a little problematic because of a lot of methods defined in Javadoc and not so much examples. In this article we could read first which are the points characterizing CompletableFuture. We learned that it was an object helping to write non-blocking programs, event-driven oriented, with a lot of possibilities to chain function calls. The second part was more pragmatic because it presented the main characteristics of CompletableFuture with several test cases.