Completion service in Java

Producer-consumer services can be very quickly realized with native Java features. One of them, arrived with the 7th version of Java, is CompletionService.

Through this article we'll discover the world of CompletionService. At the begin we'll see theoretical aspects of this object. In the second part we'll show how it works by writing one sample JUnit test case.

CompletionService in Java

CompletionService is an interface from java.util.concurrent package used to simplify the separation between production and consumption of realized tasks. It means that you can separately submit new tasks and analyze the tasks which already produced one result. A real-world example of CompletionService could be teenagers spending the money gained by theirs parents. The nuance is the children can spend the money only after the parents gain it and that parents can start multiple jobs at the same time. For example, both parents gain 30€ per hour. The money is automatically sent to teenagers who spend it on shopping. The operation repeats over and over again. As you can deduce, parents are here the producers and children the consumers. The earned money is placed into queue and dispatched when available to children.

Parents-children dependency shows well the main idea hidden behind CompletionService. It consists on putting some tasks into internal queue and waiting that they produce some results. The results are after put one by one, in production time order, to the queue and get by client. Method which allows to add new task to CompletionService is submit. It takes two possible signatures:
- Runnable task, V expectedResult: task being an instance of Runnable and returning the result of V type.
- Callable task: task being an instance of Callable. The difference with other method is that Callable can return a response directly.

If we want to retrieve a result of one treatment, we need to call take() method. It waits until the first Future result is accessible from CompletionService's queue. A similar method to it is poll(). But unlike take, it doesn't wait the result and returns null directly if it's unavailable.

An example of CompletionService implementation is java.util.concurrent.ExecutorCompletionService object which bases the execution of tasks on java.util.concurrent.Executor objects. If you're curious about the implementation, you can take a look on implementation in OpenJDK. As you can observe, ExecutorCompletionService behaves there as a wrapper for Executor and BlockingQueue.

Example of CompletionService in Java

To understand better how to implement CompletionService, let's write a simple case with ExecutorCompletionService and out story of teenagers spending the money of theirs parents.

public class CompletionServiceTest {

  @Test
  public void launch() {
    /**
      * First, we initialize the tasks executed by CompletionService.
      */
    List> jobs = new ArrayList>();
    jobs.add(new Parent(10.5d, 3000));
    jobs.add(new Parent(5d, 1000));
    jobs.add(new Parent(36.75d, 1000));
    jobs.add(new Parent(11d, 2000));
    jobs.add(new Parent(19d, 2000));

    Teenager teenager = new Teenager();

    ExecutorService executorService = Executors.newFixedThreadPool(10);
    CompletionService completionService = new ExecutorCompletionService(executorService);

    double expectedTotal = 10.5d+5d+36.75d+11d+19d;
    try {
      /**
        * We submit all jobs with submit method.
        */
      for (Callable job : jobs) {
          completionService.submit(job);
      }
      /**
        * For every job we want to get the result and treat it by teenager (consumer).
        */
      for (int i = 0; i < jobs.size(); i++) {
          Future result = completionService.take();
          teenager.consume(result.get());
      }
    } catch (Exception e) {
      e.printStackTrace();
      executorService.shutdown();
      fail("An error occurred on testing CompletionService: "+e.getMessage());
    }
    assertTrue("Money consumed by teenager ("+teenager.getTotalConsumed()+") should be the same as expected ("+expectedTotal+")",
      expectedTotal == teenager.getTotalConsumed());
  }


  @Test
  public void launchWithTimeouts() {
    System.out.println("------------------------- launchWithTimeouts -----------------------------");
    /**
      * This test sample takes exactly the same scenario as the first one.
      * However, it will produce some failures because of timeouts in poll(timeout, TimeUnit)
      * method.
      */
    List> jobs = new ArrayList>();
    jobs.add(new Parent(10.5d, 7000));
    jobs.add(new Parent(5d, 1000));
    jobs.add(new Parent(36.75d, 1000));
    jobs.add(new Parent(11d, 7000));
    jobs.add(new Parent(19d, 7000));

    Teenager teenager = new Teenager();

    ExecutorService executorService = Executors.newFixedThreadPool(10);
    CompletionService completionService = new ExecutorCompletionService(executorService);

    double expectedTotal = 5d+36.75d;
    try {
      /**
        * We submit all jobs with submit method.
        */
      for (Callable job : jobs) {
          completionService.submit(job);
      }
      /**
        * For every job we want to get the result and treat it by teenager (consumer).
        */
      for (int i = 0; i < jobs.size(); i++) {
        try {
            /**
              * Because of 1 second timeout, only two task will be consumed. It concerns two
              * tasks with only 1 second sleep. The 3 remaining have the sleep of 7 seconds, so
              * theirs results won't be retrieved here.
              */
            Future result = completionService.poll(1, TimeUnit.SECONDS);
            if (result != null && result.get() != null) {
              teenager.consume(result.get());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
      }
    } catch (Exception e) {
      e.printStackTrace();
      executorService.shutdown();
      fail("An error occurred on testing CompletionService: "+e.getMessage());
    }
    assertTrue("Money consumed by teenager ("+teenager.getTotalConsumed()+") should be the same as expected ("+expectedTotal+")",
      expectedTotal == teenager.getTotalConsumed());
  }

}


class Parent implements Callable {

  private double salary;
  private int sleep;

  public Parent(double salary, int sleep) {
    this.salary = salary;
    this.sleep = sleep;
  }

  @Override
  public Double call() throws Exception {
    try {
      System.out.println("[PARENT] I'm starting to work");
      Thread.sleep(this.sleep);
    } catch (Exception e) {
      e.printStackTrace();
    }
    System.out.println("[PARENT] I ended my work and I earned "+this.salary);
    return this.salary;
  }
}

class Teenager {

  private double totalConsumed;

  public void consume(double money) {
    System.out.println("[TEENAGER] Great ! I've "+money+" to spend");
    totalConsumed += money;
  }

  public double getTotalConsumed() {
    return this.totalConsumed;
  }
}

In this article we discovered the work with CompletionService. At the begin, we seen the composition of CompletionService interface. After that, in the second part, we wrote simple test case to show the features of CompletionService.

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!