Concurrent programming isn't not only about executing several tasks in parallel threads. It's only about analyzing results of these process. In Java, we can analyze them thanks to the idea of futures.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
In this article we'll discover first what is hidden behind this concept of futures. After that we'll implement them in the code example.
What are futures in Java ?
Futures are the proxy objects representing the results of asynchronous computations (in separated threads). Sometimes the idea of futures are used interchangeably with promises and delays but it's partially wrong. Future is a simple, read-only object containing the result of asynchronous computation. Promise also contains the result of asynchronous operation but, unlike future, it's writable.In Java, futures are located inside java.util.concurrent package. Future is an interface containing following methods:
- cancel: tries to cancel the execution of future's task.
- get: gets the result of asynchronous computation. Two get methods, with different signatures, are available. The first one, without any parameter, waits for the future result until the computation ends. The second method takes in parameter the values of timeout (value and time unit). If the result isn't available within given timeout, a TimeoutException is thrown. The thread invoking get method of Future instance is blocked until the response is ready to be returned. To avoid different concurrent problems as deadlocks, it's preferable to use the method with timeout signature.
- isCancelled: checks if future was cancelled or not.
- isDone: checks if future was completely done or not.
A subinterface of Future is ScheduledFuture from the same package. Inter alia, ScheduledFuture is the result of computation made by ScheduledExecutorService. By the way, you can see it in action in the article about executors in Java. This interface implements two supplementary interfaces to Future: java.util.concurrent.Delayed and java.lang.Comparable. From the first, it inherits getDelay method which returns the remaining delay of associated object. The second superinterface provides compareTo method. Thanks to it, ScheduledFuture instances can be compared.
An example of Future implementation can be java.util.concurrent.FutureTask. It implements also Runnable interface and thanks to it, FutureTask can be submitted to the implementations of basic java.util.concurrent.Executor interface. Unlike its subinterfaces, Executor contains only one method, execute. This method accepts only Runnable objects.
How to get Future object ? They can be get from the implementations of Callable interface. And Callable interface implementations can be submitted to execution for executors (ExecutorService, AbstractExecutorService, ScheduledThreadPoolExecutor).
Future with ExecutorService
In the first use case, we'll try to get Future instances through ExecutorService instance. We've already done it with some of classes implementing ExecutorService interface in the article about executors in Java. It's the reason why here, we'll only focus on executors thread pools, created with Executors factory methods (newFixedThreadPool, newCachedThreadPool and newSingleThreadExecutor). This time, we'll simply check the Future methods, as cancel, isCancelled or isDone. We'll also try to throw TimeoutException.
public class FutureTest { public static final String FAST_MSG = "Fast operation is already executed"; public static String SLOW_MSG = ""; @Test public void test() { ExecutorService executor = Executors.newFixedThreadPool(2); /* * We start by checking if the timeout works well. Normally, SlowOperation is executed in 5 seconds while specified * timeout it 2 seconds. TimeoutException is expected. */ FuturelongTimeFuture = executor.submit(new SlowOperation()); boolean wasTe = false; try { String successMessage = longTimeFuture.get(2, TimeUnit.SECONDS); } catch (TimeoutException te) { wasTe = true; } catch (Exception e) { e.printStackTrace(); } assertTrue("TimeoutException should occured but it wasn't", wasTe); Future fastFuture = executor.submit(new FastOperation()); String result = null; try { result = fastFuture.get(); } catch (Exception e) { e.printStackTrace(); fail("Fast executed Future shouldn't failed: "+e.getMessage()); } assertTrue("Expected result of Future is '"+FAST_MSG+"' but '"+result+"' was generated instead", FAST_MSG.equals(result)); /* * Both Futures aren't cancelled. They must be cancelled explicitelly, through cancel() call. */ assertFalse("longTimeFuture is cancelled but it shouldn't be", longTimeFuture.isCancelled()); assertFalse("fastFuture is cancelled but it shouldn't be", fastFuture.isCancelled()); /* * Only fastFuture is considered as 'done'. Its results was received by the invoking thread while it's not the case for * longTimeFuture task. */ assertTrue("fastFuture should be in 'done' state but it's not", fastFuture.isDone()); assertFalse("longTimeFuture shouldn't be in 'done' state but it is", longTimeFuture.isDone()); /* * We attempt to cancel task in 'done' state. According to the Javadoc, it should be impossible: * " This attempt will fail if the task has already completed" */ fastFuture.cancel(true); assertFalse("fastFuture is cancelled but it shouldn't be", fastFuture.isCancelled()); /* * We try to cancel task in 'not done' state. It should be successful here. We use false to tell that the task shouldn't be * interrupted. It means that FutureTask.SLOW_MSG will remain an empty String (set won't be reached in SlowOperation.call method, after * try-catch block). */ longTimeFuture.cancel(false); assertTrue("longTimeFuture isn't cancelled but it shoul be", longTimeFuture.isCancelled()); /* * Now, we make the same test as previously with longTimeFuture, but instead of using cancel(false), we'll use cancel(true). Next, we'll test if * SLOW_MSG's values changed (so the code after try-catch block was reached). */ Future newLongTimeFuture = executor.submit(new SlowOperation()); wasTe = false; try { String successMessage = newLongTimeFuture.get(1, TimeUnit.SECONDS); } catch (TimeoutException te) { wasTe = true; } catch (Exception e) { e.printStackTrace(); } assertTrue("TimeoutException should occured but it wasn't", wasTe); // cancel the task and check SLOW_MSG directly after that newLongTimeFuture.cancel(true); assertTrue("newLongTimeFuture isn't cancelled but it shoul be", newLongTimeFuture.isCancelled()); /* * If you remove this sleep, the test will fail. SLOW_MSG field is set by cancel method when the flag is true, but it's made some miliseconds after * JUnit's assertion method. */ try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } assertTrue("SLOW_MSG should be '"+SlowOperation.EXPECTED_MSG+"' and was '"+SLOW_MSG+"'", SLOW_MSG.equals(SlowOperation.EXPECTED_MSG)); System.out.println("Executed after future result"); } } class FastOperation implements Callable { @Override public String call() throws Exception { try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } return FutureTest.FAST_MSG; } } class SlowOperation implements Callable { public static final String EXPECTED_MSG = "Long operation was successfully executed"; @Override public String call() throws Exception { try { // sleeping time must be greater than timeout value specified in get method in the main thread Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } FutureTest.SLOW_MSG = EXPECTED_MSG; return FutureTest.SLOW_MSG; } }
FutureTask with Executor
The test case with Future looks simple. The case with FutureTask will look a little bit different because of difference between Future and FutureTask constructions. As mentioned earlier, FutureTask implements RunnableFuture interface. RunnableFuture which inherits in its turn, from Runnable and Future. So, FutureTask will be a mix of two last interfaces implementation. In this example we'll also use thread pools derived from Executors factory method, but instead of ExecutorsService, we'll manipulate on Executor instance. The difference is that Executor accepts only Runnable objects while ExecutorService Runnable and Callable:
public class FutureTaskTest { @Test public void test() { String breakfastReadyMsg = "Breakfast is ready"; String lunchReadyMsg = "Lunch is ready"; FutureTaskbreakfast = new FutureTask (new BreakfastPreparator(), breakfastReadyMsg); FutureTask lunch = new FutureTask (new LunchPreparator(), lunchReadyMsg); /* * Case with tasks executed without executor. * Tasks are sleeping time from2 to 4 seconds. So this code should pass. However, it won't pass because * both tasks aren't started (run method not invoked). */ boolean wasTe = false; try { String message = breakfast.get(10, TimeUnit.SECONDS); System.out.println("Message :"+message); message = lunch.get(10, TimeUnit.SECONDS); System.out.println("Message :"+message); } catch (TimeoutException te) { wasTe = true; } catch (Exception e) { e.printStackTrace(); } assertTrue("TimeoutException should occur, but it wasn't", wasTe); assertFalse("breakfast shouldn't be in 'done' state but it was", breakfast.isDone()); assertFalse("lunch shouldn't be in 'done' state but it was", lunch.isDone()); /* * This time we invoke run() before checking the results from both of tasks. It should work with 10 seconds timeout. */ wasTe = false; try { breakfast.run(); lunch.run(); String message = breakfast.get(10, TimeUnit.SECONDS); assertTrue("Message should be '"+breakfastReadyMsg+"' but was '"+message+"'", breakfastReadyMsg.equals(message)); message = lunch.get(10, TimeUnit.SECONDS); assertTrue("Message should be '"+lunchReadyMsg+"' but was '"+message+"'", lunchReadyMsg.equals(message)); } catch (TimeoutException te) { wasTe = true; } catch (Exception e) { e.printStackTrace(); fail("An exception occured but it shouldn't: "+e.getMessage()); } assertFalse("TimeoutException shouldn't occur, but it was", wasTe); assertTrue("breakfast should be in 'done' state but it wasn't", breakfast.isDone()); assertTrue("lunch should be in 'done' state but it wasn't", lunch.isDone()); /* * Here we check if the tasks can be executed with executor. */ Executor executor = Executors.newFixedThreadPool(2); FutureTask breakfastExecutor = new FutureTask (new BreakfastPreparator(), breakfastReadyMsg); FutureTask lunchExecutor = new FutureTask (new LunchPreparator(), lunchReadyMsg); executor.execute(breakfastExecutor); executor.execute(lunchExecutor); try { String message = breakfastExecutor.get(10, TimeUnit.SECONDS); assertTrue("Message should be '"+breakfastReadyMsg+"' but was '"+message+"'", breakfastReadyMsg.equals(message)); message = lunchExecutor.get(10, TimeUnit.SECONDS); assertTrue("Message should be '"+lunchReadyMsg+"' but was '"+message+"'", lunchReadyMsg.equals(message)); } catch (Exception e) { e.printStackTrace(); } assertTrue("breakfastExecutor should be in 'done' state but it wasn't", breakfastExecutor.isDone()); assertTrue("lunchExecutor should be in 'done' state but it wasn't", lunchExecutor.isDone()); /* * Check method should have the same effect as cancel() method for objects implementing Future and getting through Executor's submit methods. */ assertFalse("breakfast shouldn't be cancelled but it was", breakfast.isCancelled()); assertFalse("lunch shouldn't be cancelled but it was", lunch.isCancelled()); assertFalse("breakfastExecutor shouldn't be cancelled but it was", breakfastExecutor.isCancelled()); assertFalse("lunchExecutor shouldn't be cancelled but it was", lunchExecutor.isCancelled()); /* * All cancelled tasks are in 'done' state, so they can't be cancelled according to the Javadoc: * "Attempts to cancel execution of this task. This attempt will fail if the task has already completed, " */ breakfast.cancel(true); lunch.cancel(true); breakfastExecutor.cancel(true); lunchExecutor.cancel(true); assertFalse("breakfast shouldn't be cancelled but it was", breakfast.isCancelled()); assertFalse("lunch shouldn't be cancelled but it was", lunch.isCancelled()); assertFalse("breakfastExecutor shouldn't be cancelled but it was", breakfastExecutor.isCancelled()); assertFalse("lunchExecutor shouldn't be cancelled but it was", lunchExecutor.isCancelled()); // Make new task and cancel it without passing to 'done' state. FutureTask cancellableLunch = new FutureTask (new LunchPreparator(), lunchReadyMsg); assertFalse("cancellableLunch shouldn't be in 'cancelled' state", cancellableLunch.isCancelled()); assertFalse("cancellableLunch shouldn't be in 'done' state", cancellableLunch.isDone()); cancellableLunch.cancel(true); assertTrue("cancellbableLuch should be in 'cancelled' state but it wasn't", cancellableLunch.isCancelled()); } } class BreakfastPreparator implements Runnable { @Override public void run() { System.out.println("Breakfast prepearation has begun"); try { Thread.sleep(2000); } catch(Exception e) { e.printStackTrace(); } System.out.println("Breakfast preperation ended"); } } class LunchPreparator implements Runnable { @Override public void run() { System.out.println("Lunch prepearation has begun"); try { Thread.sleep(4000); } catch(Exception e) { e.printStackTrace(); } System.out.println("Lunch preperation ended"); } }
To resume, Future is a placeholder object for something that you will get in the future. The future result can come from long computation process, as a treatment of big-sized file or download of many files through slow network. It can also block thread invoking Future object. In this way, Future can be used as a barrier for execute the tasks only if given condition (downloaded element or initialized object for example) is made.