Divide and conquer with fork/join framework

When performance matters (when it does not?), dealing with a big task can prove to be very costly. One of solutions for this kind of bottlenecks could be the using of divide and conquer algorithms. In Java it can be retrieved in fork/join framework.

This article describes the fork/join framework. The first part contains some theoretical thoughts about the general idea hidden behind divide & conquer algorithms. The second part describes the main components of fork/join framework in Java. The last part, as usual, presents a sample implementation of fork/join.

Divide and conquer algorithms

The goal of divide and conquer algorithms consists on separating a big problem on several smaller subproblems of the same type. After, subproblems are executed and their results are combined to get the final result for the main problem. Very often you can meet following words to describe these steps: divide (to smaller subproblems), conquer (smaller subproblems) and obtain (the solutions of smaller subproblems).

As you could see in previous paragraph, divide and conquer algorithm follows the top-down approach. It consists on fact that the solution for top level problem is produced by down level smaller problems, being a part of the main problem.

In programming, divide and conquer are used when one big problem can be divided, for example, in sorting or searching elements. To understand the principle on real world example, let's take the case of mergesort algorithm. Imagine that we have an array of 10 numbers that we want to sort in ascending order. To do so in divide and conquer way, we need first to divide to smaller arrays and after (divide step), sort them (conquer step) and merge to final sorted array (obtain step).

Fork/join framework

Now when we know some basic principles of divide and conquer methodology, we can dive into Java implementation of it through fork/join framework. The main class orchestrating the work of subtasks is java.util.concurrent.ForkJoinPool. This executor service has 3 methods used to launch execution of subtasks:

ForkJoinPool can be constructed with default settings but it can also override some of them, such as: parallelism (the number of threads or CPU allowed to work concurrently on defined tasks), thread factory (creates threads used to process tasks). Tasks which ForkJoinPool can execute, can be resumed to following classes:

Two the most important methods on ForkJoinTask implementations are fork() and join(). Both allow to synchronize the work of each parent task with its children tasks. The first one can be considered as declaration-oriented method because it only arranges the asynchronous execution of given task. In the other side, join() is more blocking operation because it needs forked subtasks results before continue. According to ForkJoinTask Javadoc, other blocking synchronizations, such as synchronized blocks, should be avoided in the code executed by ForkJoinPool.

Tasks executed by ForkJoinPool aren't threads. They create a pool of threads available to execute submitted tasks. When there are no thread available to handle newly submitted task, the task is directly sent to the tasks queue of one of defined threads. And it's treated once all previously put tasks are executed.

Thanks to this task-oriented architecture, fork/join framework can use the mechanism called work stealing. It occurs when one thread has nothing to do (its queue is empty) and another one is overloaded (it has a lot of tasks to execute in the queue). In this case, thread without defined tasks will go to overloaded thread and execute tasks from its queue. It's one of the reasons why fork/join should perform well even if we have a lot of tasks to execute and we can't create new thread for each of them because of performances considerations.

Fork/join example in Java

To see fork/join example, let's imagine following problem: we have a very long text saved in a .txt file. We want to count the number of times when each word appears in this text: "x s s x x x e r t s x". To do so, we'll begin by defining test case:

public class ForkJoinTest {

  @Test
  public void should_correctly_compute_words_occurrences_in_sample_text_with_only_one_subtasks_recursion() {
    int maxThreads = 5;
    ForkJoinPool forkJoinPool = new ForkJoinPool(maxThreads);
    Map<String, Integer> result = forkJoinPool.invoke(new Reader(22, "x s s x x x e r t s x"));

    checkCommonLetters(result);
  }

  @Test
  public void should_correctly_compute_words_occurrences_when_several_subtasks_are_created_recursively_for_odd_number_of_words() {
    int maxThreads = 5;
    ForkJoinPool forkJoinPool = new ForkJoinPool(maxThreads);
    Map<String, Integer> result = forkJoinPool.invoke(new Reader(2, "x s s x x x e r t s x"));

    checkCommonLetters(result);
  }

  @Test
  public void should_correctly_compute_words_occurrences_when_several_subtasks_are_created_recursively_for_even_number_of_words() {
    int maxThreads = 5;
    ForkJoinPool forkJoinPool = new ForkJoinPool(maxThreads);
    Map<String, Integer> result = forkJoinPool.invoke(new Reader(2, "x s s x x x e r t s x f"));

    checkCommonLetters(result);
    assertThat(result.get("f").intValue()).isEqualTo(1);
  }

  private void checkCommonLetters(Map<String, Integer> result) {
    assertThat(result.get("x").intValue()).isEqualTo(5);
    assertThat(result.get("s").intValue()).isEqualTo(3);
    assertThat(result.get("e").intValue()).isEqualTo(1);
    assertThat(result.get("r").intValue()).isEqualTo(1);
    assertThat(result.get("t").intValue()).isEqualTo(1);
  }

}

The cases haven't nothing mysterious and interesting. More interesting code is written in the side of recursive task submitted to ForkJoinPool, called Reader. You can observe there well the situation when we divide our analyzed text to much smaller texts. We do so until coming to the moment when there are nothing to divide and we can tell that given word is present only once in the text. These recursive calls are used further to compute total number of word occurrences in analyzed text:

public class Reader extends RecursiveTask<Map<String, Integer>> {

  private final int maxWordsInText;
  private final String text;

  public Reader(int maxWordsInText, String text) {
    this.maxWordsInText = maxWordsInText;
    this.text = text;
  }

  @Override
  protected Map<String, Integer> compute() {
    List<Reader> subtasks = getSubtask();
    // We can't divide on more tasks
    if (subtasks.size() == 1) {
      return ImmutableMap.of(text, 1);
    }
    // Otherwise, consume the result of each tasks which thanks to recursion will always return a map with single element
    subtasks.forEach((subtask) -> subtask.fork());
    Map<String, Integer> words = new HashMap<>();
    for (Reader subtask : subtasks) {
      Map<String, Integer> subtaskWords = subtask.join();
      subtaskWords.forEach(new WordOccurrenceConsumer(words));
    }
    return words;
  }

  private List<Reader> getSubtask() {
    List<Reader> readers = new ArrayList<>();
    for (String words  : StringHelper.getWords(text, maxWordsInText)) {
      readers.add(new Reader(maxWordsInText, words));
    }
    return readers;
  }

}

Fork/join framework is Java implementation of divide and conquer mechanism, used in binary search or mergesort algorithms. Firstly, we discovered some basic principles of this mechanism. Thanks to them we were able to better understand the working of fork/join framework which was described in the second part. The last part proved by simple test case how to use ForkJoinPool to count the number of words occurrences in given text.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!