Producer-consumer services can be very quickly realized with native Java features. One of them, arrived with the 7th version of Java, is CompletionService.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
Through this article we'll discover the world of CompletionService. At the begin we'll see theoretical aspects of this object. In the second part we'll show how it works by writing one sample JUnit test case.
CompletionService in Java
CompletionService is an interface from java.util.concurrent package used to simplify the separation between production and consumption of realized tasks. It means that you can separately submit new tasks and analyze the tasks which already produced one result. A real-world example of CompletionService could be teenagers spending the money gained by theirs parents. The nuance is the children can spend the money only after the parents gain it and that parents can start multiple jobs at the same time. For example, both parents gain 30β¬ per hour. The money is automatically sent to teenagers who spend it on shopping. The operation repeats over and over again. As you can deduce, parents are here the producers and children the consumers. The earned money is placed into queue and dispatched when available to children.
Parents-children dependency shows well the main idea hidden behind CompletionService. It consists on putting some tasks into internal queue and waiting that they produce some results. The results are after put one by one, in production time order, to the queue and get by client. Method which allows to add new task to CompletionService is submit. It takes two possible signatures:
- Runnable task, V expectedResult: task being an instance of Runnable and returning the result of V type.
- Callable
If we want to retrieve a result of one treatment, we need to call take() method. It waits until the first Future result is accessible from CompletionService's queue. A similar method to it is poll(). But unlike take, it doesn't wait the result and returns null directly if it's unavailable.
An example of CompletionService implementation is java.util.concurrent.ExecutorCompletionService object which bases the execution of tasks on java.util.concurrent.Executor objects. If you're curious about the implementation, you can take a look on implementation in OpenJDK. As you can observe, ExecutorCompletionService behaves there as a wrapper for Executor and BlockingQueue.
Example of CompletionService in Java
To understand better how to implement CompletionService, let's write a simple case with ExecutorCompletionService and out story of teenagers spending the money of theirs parents.
public class CompletionServiceTest { @Test public void launch() { /** * First, we initialize the tasks executed by CompletionService. */ List> jobs = new ArrayList >(); jobs.add(new Parent(10.5d, 3000)); jobs.add(new Parent(5d, 1000)); jobs.add(new Parent(36.75d, 1000)); jobs.add(new Parent(11d, 2000)); jobs.add(new Parent(19d, 2000)); Teenager teenager = new Teenager(); ExecutorService executorService = Executors.newFixedThreadPool(10); CompletionService completionService = new ExecutorCompletionService (executorService); double expectedTotal = 10.5d+5d+36.75d+11d+19d; try { /** * We submit all jobs with submit method. */ for (Callable job : jobs) { completionService.submit(job); } /** * For every job we want to get the result and treat it by teenager (consumer). */ for (int i = 0; i < jobs.size(); i++) { Future result = completionService.take(); teenager.consume(result.get()); } } catch (Exception e) { e.printStackTrace(); executorService.shutdown(); fail("An error occurred on testing CompletionService: "+e.getMessage()); } assertTrue("Money consumed by teenager ("+teenager.getTotalConsumed()+") should be the same as expected ("+expectedTotal+")", expectedTotal == teenager.getTotalConsumed()); } @Test public void launchWithTimeouts() { System.out.println("------------------------- launchWithTimeouts -----------------------------"); /** * This test sample takes exactly the same scenario as the first one. * However, it will produce some failures because of timeouts in poll(timeout, TimeUnit) * method. */ List > jobs = new ArrayList >(); jobs.add(new Parent(10.5d, 7000)); jobs.add(new Parent(5d, 1000)); jobs.add(new Parent(36.75d, 1000)); jobs.add(new Parent(11d, 7000)); jobs.add(new Parent(19d, 7000)); Teenager teenager = new Teenager(); ExecutorService executorService = Executors.newFixedThreadPool(10); CompletionService completionService = new ExecutorCompletionService (executorService); double expectedTotal = 5d+36.75d; try { /** * We submit all jobs with submit method. */ for (Callable job : jobs) { completionService.submit(job); } /** * For every job we want to get the result and treat it by teenager (consumer). */ for (int i = 0; i < jobs.size(); i++) { try { /** * Because of 1 second timeout, only two task will be consumed. It concerns two * tasks with only 1 second sleep. The 3 remaining have the sleep of 7 seconds, so * theirs results won't be retrieved here. */ Future result = completionService.poll(1, TimeUnit.SECONDS); if (result != null && result.get() != null) { teenager.consume(result.get()); } } catch (Exception e) { e.printStackTrace(); } } } catch (Exception e) { e.printStackTrace(); executorService.shutdown(); fail("An error occurred on testing CompletionService: "+e.getMessage()); } assertTrue("Money consumed by teenager ("+teenager.getTotalConsumed()+") should be the same as expected ("+expectedTotal+")", expectedTotal == teenager.getTotalConsumed()); } } class Parent implements Callable { private double salary; private int sleep; public Parent(double salary, int sleep) { this.salary = salary; this.sleep = sleep; } @Override public Double call() throws Exception { try { System.out.println("[PARENT] I'm starting to work"); Thread.sleep(this.sleep); } catch (Exception e) { e.printStackTrace(); } System.out.println("[PARENT] I ended my work and I earned "+this.salary); return this.salary; } } class Teenager { private double totalConsumed; public void consume(double money) { System.out.println("[TEENAGER] Great ! I've "+money+" to spend"); totalConsumed += money; } public double getTotalConsumed() { return this.totalConsumed; } }
In this article we discovered the work with CompletionService. At the begin, we seen the composition of CompletionService interface. After that, in the second part, we wrote simple test case to show the features of CompletionService.