Completable future

With Java 8 comes a complementary class to classical Future objects. This class not only helps to work with promises but also makes possible to get future results in non-blocking way.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in πŸ‘‰ Early Release on the O'Reilly platform

I also help solve your data engineering problems πŸ‘‰ contact@waitingforcode.com πŸ“©

Through this article we'll explore this new feature called CompletableFuture. In the first part of the article, we'll focus on CompletableFuture composition. However, we won't list each of available methods because there are numerous. Instead of it, we'll list the main characteristicts and describe each of them shortly. The second part of the article will contain some test cases which should help to understand how to use CompletableFuture in real code.

What is CompletableFuture ?

java.util.concurrent.CompletableFuture was introduced in Java 8 as one of responses to growing demand to construct asynchronous and non-blocking programs. Because unlike usual Future object, main thread doesn't need to wait future jobs before continue. It can as well make its work and handle promise result only when available. For use-case we could imagine the program preparing some group of data in main thread (for example: by getting it from database) and the rest in separate threads (for example: by making some asynchronous calls to web services not influencing rendered page). Let's see that in the list describing CompletableFuture characteristics:

As we can see through these different points, CompletableFuture matches well with the trend of reactive programming. Thanks to its event-driven approach, non-blocking way of work, CompletableFuture can be used with another asynchronous tools to develop reactive applications.

CompletableFuture example

But before concluding, let's play a little with CompletableFuture. As usual, this time also we'll write some unit tests to show some of principal aspects of this new future: non-blocking, event-driven, thread independent and chaining:

public class CompletableFutureTest {

  @Test
  public void should_correctly_download_images_in_chaining() throws InterruptedException, ExecutionException {
    long startTime = System.currentTimeMillis();
    FileDownloader downloader4 = new FileDownloader(4);
    FileDownloader downloader8 = new FileDownloader(8);

    final Boolean[] runStat = new Boolean[] {false};
    CompletableFuture<Boolean> pageDownloaded = CompletableFuture.supplyAsync(downloader4::download)
          .thenCombine(CompletableFuture.supplyAsync(downloader8::download), (result1, result2) -> result1 && result2);
    pageDownloaded.thenRun(() -> {
      // Here we can run a single task after completing two combined CompletableFutures
      runStat[0] = true;
    });

    assertThat(pageDownloaded.get()).isTrue();
    // Because of get() call, this operation will be blocker and should take 4 seconds (8/2 = 4; 4*1000=4000 ms)
    long endTime = System.currentTimeMillis();
    assertThat((endTime - startTime)/1000).isEqualTo(4L);
    assertThat(runStat[0]).isTrue();
  }

  @Test
  public void should_detect_join_method_as_blocking() {
    long startTime = System.currentTimeMillis();
    FileDownloader downloader6 = new FileDownloader(6);
    CompletableFuture<Boolean> pageDownloaded = CompletableFuture.supplyAsync(downloader6::download);

    // join(), as get(), is a blocking operation
    pageDownloaded.join();

    long endTime = System.currentTimeMillis();
    assertThat((endTime - startTime)/1000).isEqualTo(3L);
  }

  @Test
  public void should_allow_parent_thread_to_continue_with_event_driven_approach() throws InterruptedException {
    List<RunningStats> stats = new ArrayList<>();
    getAsyncFileDownloader(2000L, stats);
    getAsyncFileDownloader(2000L, stats);
    Thread.sleep(1); // 1ms for security to avoid equality between tasks start time and time after tasks creation
    final long timeAfterCreatingTasks = System.currentTimeMillis();

     /**
      * This operations is not blocker. It means that without any of other synchronization mechanism, 
      * main thread will and we won't able to see prints from CompletableFutures. 
      * It's why we use primitive 3 seconds sleep.
      */
    Thread.sleep(3000);

    stats.forEach((stat) -> {
       /**
        * These tests prove that CompletableFuture used with complete() instead of get() 
        * call is non-blocking for the main thread. The main thread can continue 
        * to execute its job without waiting for CompletableFutures to finish theirs tasks. 
        * So to resume, the tasks must begin before timer and they must finish 
        * 2 seconds after it.
        */
      assertThat(timeAfterCreatingTasks).isGreaterThan(stat.getStartTime());
      assertThat(timeAfterCreatingTasks).isLessThan(stat.getEndTime());
      assertThat(stat.isListenerCalled()).isTrue();
    });
  }

  private CompletableFuture<Boolean> getAsyncFileDownloader(long sleepTime, List<RunningStats> statsList) {
    final RunningStats stats = new RunningStats();
    statsList.add(stats);

    final CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
    Runnable task = () -> {
      stats.setStartTime(System.currentTimeMillis());
      try {
        Thread.sleep(sleepTime);
      } catch (InterruptedException ie) {
        completableFuture.completeExceptionally(ie);
      }
      stats.setEndTime(System.currentTimeMillis());
      completableFuture.complete(true);
    };
    new Thread(task).start();
    completableFuture.whenComplete((result, exception) -> {
      stats.setListenerCalled(true);
    });
    return completableFuture;
  }

  private static final class FileDownloader {
    private static final int MB_PER_SEC = 2;

    private final int sizeToDownload;

    public FileDownloader(int sizeToDownload) {
      this.sizeToDownload = sizeToDownload;
    }

    public Boolean download() {
      int chunks = sizeToDownload/MB_PER_SEC;
      for (int chunk = 0; chunk < chunks; chunk++) {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      return sizeToDownload%2 == 0;
    }
  }

  private static final class RunningStats {
    private long startTime;
    private long endTime;
    private boolean listenerCalled;
    // getters and setters omitted
  }
}

Even if Java 8 has been released one year ago, understand CompletableFuture may be a little problematic because of a lot of methods defined in Javadoc and not so much examples. In this article we could read first which are the points characterizing CompletableFuture. We learned that it was an object helping to write non-blocking programs, event-driven oriented, with a lot of possibilities to chain function calls. The second part was more pragmatic because it presented the main characteristics of CompletableFuture with several test cases.


If you liked it, you should read:

πŸ“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!