Google Guava as a set of libraries used in Google projects couldn't not contain some worries about concurrency.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
In this article we'll explore different parts of concurrency part of Google Guava library. In the first part we'll see how, through immutable objects, Google's library helps to improve code thread-safety. In the second and third parts we'll see what contains concurrency package of this library, ie. Future implementation and locking.
Immutable objects in Google Guava
Immutable objects are by definition thread safe. It means that we can't change them after initialization. So they're no risk that two concurrent threads will lead these objects into inconsistent state. Google Guava provides some of immutable objects, mainly placed in com.google.common.collect package. How the immutability is achieved for these collections ? If you remember some collections from Java's util.concurrent package, you surely know that you can add new value into ArrayList or HashMap at any moment of the object's life cycle.
To understand the immutability in Google Guava's collections, take a look on ImmutableMap abstract class. It implements Java's native java.util.Map interface. By doing so, it must define some of already known methods (put, remove or clear). These methods doesn't contain an real implementation in the executive code. Instead of that, they adopt fail-fast approach by throwing an UnsupportedOperationException. In additionally, they're all annotated as @Deprecated:
@Deprecated @Override public final V put(K k, V v) { throw new UnsupportedOperationException(); } @Deprecated @Override public final V remove(Object o) { throw new UnsupportedOperationException(); } @Deprecated @Override public final void clear() { throw new UnsupportedOperationException(); }
If you want to construct an ImmutableMap instance, you must use ImmutableMap's builder. If you don't know this concept, please read about design patterns in Spring. To simplify, builder is a internal class that we use to construct destination object. The destination object has usually a constructor inaccessible publicly. It's the case of ImmutableSortedMap those constructors have package-private visibility:
ImmutableSortedMap() { } ImmutableSortedMap(ImmutableSortedMapdescendingMap) { this.descendingMap = descendingMap; }
Let's check ImmutableMultimap (adopts the same immutable techniques as ImmutableSortedMap) and verify if it really is immutable:
@Test public void immutableObjects() { final ImmutableMultimappupils = ImmutableMultimap. builder().put("Andrew", "Brake").put("Mathew", "Son").build(); // define expected values int expectedSize = 2; String[][] expectedEntries = new String[][] {{"Andrew", "Brake"}, {"Mathew", "Son"}}; final String[][] phantomEntry = new String[][] {{"Strange", "Pupil"}, {"Other stranger", ""}}; // run two threads and check if expectedEntries are present in pupilsMap final CountDownLatch latch = new CountDownLatch(2); new Thread(new Runnable() { @Override public void run() { try { pupils.put(phantomEntry[0][0], phantomEntry[0][1]); } catch (Exception e) { e.printStackTrace(); } latch.countDown(); } }).start(); new Thread(new Runnable() { @Override public void run() { try { pupils.put(phantomEntry[1][0], phantomEntry[1][1]); } catch (Exception e) { e.printStackTrace(); } latch.countDown(); } }).start(); try { latch.await(); } catch (Exception e) { e.printStackTrace(); fail("An error occurred on waiting for latch count down: "+e.getMessage()); } // check if pupils map changed after two previously run threads assertTrue("Map size after two concurrent puts should be "+expectedSize+" but is "+pupils.size(), pupils.size() == expectedSize); for (String[] expected: expectedEntries) { ImmutableCollection pupil = pupils.get(expected[0]); assertTrue("Pupils should contain '"+expected[1]+"' but it doesn't", pupil.contains(expected[1])); } }
Google Guava's Future implementation
The second interesting point of concurrency handling in Google Guava is the definition of com.google.common.util.concurrent.ListenableFuture interface which extends Java's java.util.concurrent.Future interface. The only one supplementary method is addListener(Runnable task, Executor executor). It allows to define a task to run by given executor when the Future result is complete. The example of its implementation is ListenableFutureTask. In this implementation, the listeners are holded in private ExecutionList object which is executed through execute method afther the completion of the task. This method delegates listener invocation to ExecutionList's executeListener method:
/** * Submits the given runnable to the given {@link Executor} catching and logging all * {@linkplain RuntimeException runtime exceptions} thrown by the executor. */ private static void executeListener(Runnable runnable, Executor executor) { try { executor.execute(runnable); } catch (RuntimeException e) { // Log it and keep going, bad runnable and/or executor. Don't // punish the other runnables if we're given a bad one. We only // catch RuntimeException because we want Errors to propagate up. log.log(Level.SEVERE, "RuntimeException while executing runnable " + runnable + " with executor " + executor, e); } }
Another way to register listeners is the invocation of Futures addCallback(ListenableFuture
To execute ListenableFuture objects, we need also to "decorate" Java's executors. The decoration is made thanks to static ListeningExecutorService listeningDecorator(ExecutorService delegate) method of MoreExecutors class. Decorators are inner classes as ListeningDecorator which code looks like:
private static class ListeningDecorator extends AbstractListeningExecutorService { private final ExecutorService delegate; ListeningDecorator(ExecutorService delegate) { this.delegate = checkNotNull(delegate); } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return delegate.awaitTermination(timeout, unit); } @Override public boolean isShutdown() { return delegate.isShutdown(); } @Override public boolean isTerminated() { return delegate.isTerminated(); } @Override public void shutdown() { delegate.shutdown(); } @Override public ListshutdownNow() { return delegate.shutdownNow(); } @Override public void execute(Runnable command) { delegate.execute(command); } }
All decorators extend common abstract class, AbstractListeningExecutorService. As you can see, they're simple wrappers for ExecutorService instance, sometimes extended with some supplementary features as ListenableScheduledFuture initialization in ScheduledListeningDecorator. They're some tests against ListenableFuture implementations:
@Test public void listenableFutures() { final MapstatsMap = new HashMap (); /** * Execute two task in different threads coming from Executors.newFixedThreadPool(5). This executor is only wrapped to ListeningExecutorService instance. After * we define two ListenableFuture tasks: one for get the name another for get the age of somebody. Both should be executed by previously wrapped ListeningExecutorService. */ ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); ListenableFuture nameGetter = pool.submit(new NameCallable(statsMap)); ListenableFuture ageGetter = pool.submit(new AgeCallable(false, statsMap)); Futures.addCallback(nameGetter, new FutureCallback () { @Override public void onFailure(Throwable exc) { statsMap.put("nameCallable-result", "failure"); } @Override public void onSuccess(String result) { statsMap.put("nameCallable-result", "success_"+result); } }); Futures.addCallback(ageGetter, new FutureCallback () { @Override public void onFailure(Throwable exc) { statsMap.put("ageCallable-result", "failure"); } @Override public void onSuccess(Integer result) { statsMap.put("ageCallable-result", "success_"+result); } }); try { Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } String expectedName = "success_XYZ"; String expectedAge = "success_30"; int expectedExeTimeAge = 0; int expectedExeTimeName = 3; // test returned values // thread pool first String splittedPoolValue = "-t"; String[] ageThread = ((String) statsMap.get("ageCallable-thread")).split(splittedPoolValue); String[] nameThread = ((String) statsMap.get("nameCallable-thread")).split(splittedPoolValue); assertTrue("Pool from ageCallable ("+(String) statsMap.get("ageCallable-thread")+") isn't the same as for nameCallable ("+(String) statsMap.get("nameCallable-thread")+")", ageThread[0].equals(nameThread[0])); // execution time int nameCallableTime = ((Integer) statsMap.get("nameCallable-executionTime")).intValue(); assertTrue("Time taken to execute nameCallable should be 3 seconds but was "+nameCallableTime+ " seconds", nameCallableTime == expectedExeTimeName); int ageCallableTime = ((Integer) statsMap.get("ageCallable-executionTime")).intValue(); assertTrue("Time taken to execute ageCallable should be 0 seconds but was "+ageCallableTime+ " seconds", ageCallableTime == expectedExeTimeAge); // called callbacks String callbackNameValue = (String) statsMap.get("nameCallable-result"); String callbackAgeValue = (String) statsMap.get("ageCallable-result"); assertTrue("Value from name callback should be '"+expectedName+"' but is '"+callbackNameValue+"'", callbackNameValue.equals(expectedName)); assertTrue("Value from age callback should be '"+expectedAge+"' but is '"+callbackAgeValue+"'", callbackAgeValue.equals(expectedAge)); } @Test public void listenableFutureTheSameThread() { final Map statsMap = new HashMap (); /** * In this test case, all ListenableFuture tasks should be execute in the calling thread. It can be achieved with sameThreadExecutor() method which returns MoreExecutors * inner class SameThreadExecutorService. This class adds lock handling on shutdown methods. The locks are acquired before shutting down a task and released after, as here: * * @Override * public void execute(Runnable command) { * startTask(); * try { * command.run(); * } finally { * endTask(); * } * } * * @Override * public boolean isShutdown() { * lock.lock(); * try { * return shutdown; * } finally { * lock.unlock(); * } * } **/ ListeningExecutorService pool = MoreExecutors.sameThreadExecutor(); ListenableFuturenameGetter = pool.submit(new NameCallable(statsMap)); ListenableFuture ageGetter = pool.submit(new AgeCallable(true, statsMap)); Futures.addCallback(nameGetter, new FutureCallback () { @Override public void onFailure(Throwable exc) { statsMap.put("nameCallable-result", "failure"); } @Override public void onSuccess(String result) { statsMap.put("nameCallable-result", "success_"+result); } }); Futures.addCallback(ageGetter, new FutureCallback () { @Override public void onFailure(Throwable exc) { statsMap.put("ageCallable-result", "failure"); } @Override public void onSuccess(Integer result) { statsMap.put("ageCallable-result", "success_"+result); } }); String expectedName = "success_XYZ"; String expectedAge = "failure"; int expectedExeTimeAge = 0; int expectedExeTimeName = 3; // test returned values // thread pool first String ageThread = (String) statsMap.get("ageCallable-thread"); String nameThread = (String) statsMap.get("nameCallable-thread"); String currentThread = Thread.currentThread().getName(); assertTrue("Thread from ageCallable ("+ageThread+") isn't the same as for nameCallable ("+nameThread+")", ageThread.equals(nameThread)); assertTrue("Tasks should be executed in the main thread but they weren't", nameThread.equals(currentThread)); // execution time int nameCallableTime = ((Integer) statsMap.get("nameCallable-executionTime")).intValue(); assertTrue("Time taken to execute nameCallable should be 3 seconds but was "+nameCallableTime+ " seconds", nameCallableTime == expectedExeTimeName); int ageCallableTime = ((Integer) statsMap.get("ageCallable-executionTime")).intValue(); assertTrue("Time taken to execute ageCallable should be 0 seconds but was "+ageCallableTime+ " seconds", ageCallableTime == expectedExeTimeAge); // called callbacks String callbackNameValue = (String) statsMap.get("nameCallable-result"); String callbackAgeValue = (String) statsMap.get("ageCallable-result"); assertTrue("Value from name callback should be '"+expectedName+"' but is '"+callbackNameValue+"'", callbackNameValue.equals(expectedName)); assertTrue("Value from age callback should be '"+expectedAge+"' but is '"+callbackAgeValue+"'", callbackAgeValue.equals(expectedAge)); } } class NameCallable implements Callable { private Map statsMap; public NameCallable(Map statsMap) { this.statsMap = statsMap; } @Override public String call() throws Exception { long start = System.currentTimeMillis(); try { Thread.sleep(3000); } catch (Exception e) { throw e; } long end = System.currentTimeMillis(); statsMap.put("nameCallable-thread", Thread.currentThread().getName()); statsMap.put("nameCallable-executionTime", Math.round((end - start)/1000));; return "XYZ"; } } class AgeCallable implements Callable { private boolean mustFail; private Map statsMap; public AgeCallable(boolean mustFail, Map statsMap) { this.mustFail = mustFail; this.statsMap = statsMap; } @Override public Integer call() throws Exception { long start = System.currentTimeMillis(); statsMap.put("ageCallable-thread", Thread.currentThread().getName()); if (this.mustFail) { long end = System.currentTimeMillis(); statsMap.put("ageCallable-executionTime", Math.round((end - start)/1000)); throw new Exception("This test code is configure to fail and it fails right now"); } long end = System.currentTimeMillis(); statsMap.put("ageCallable-executionTime", Math.round((end - start)/1000)); return 30; } }
Locking with Google Guava's Monitor
Another concurrency feature of Google Guava is Monitor object. It acts almost as Lock and Condition objects from java.util.concurrent.locks. Although, the Monitor's Javadoc indicates that this object is intended to be a replacement for ReentrantLock which is an implementation of Lock interface. Both classes have some similarities:
- reentrancy: it means that one thread can enter and leave given Monitor several times.
- occupancy: only one thread can "occupy" the lock at given moment as well for ReentrantLock as for Monitor.
- trying timeout: both allow to acquire a lock within specified timeout. In the case of ReentrantLock, the method which does it is tryLock. For Monitor, we can use enterInterruptibly method with specified time in signature. If the Monitor can't acquire the lock immediately, it'll retry to do it after with specified time:
public boolean enterInterruptibly(long time, TimeUnit unit) throws InterruptedException { return lock.tryLock(time, unit); }
- conditions: both locks are conditions-based. Java's has its own objects from Condition class. Google Guava proposes for this Monitor.Guard object which isSatisfied() method returns true if a condition is satisfied.
If you thinkg that Monitor wrapps Java's Lock and Condition implementations, you're right. More preciselly, it wrapps ReentrantLock and uses all of its methods internally to managing locks. For conditions, it creates new Condition every time when Monitor.Guard instance is constructed:
protected Guard(Monitor monitor) { this.monitor = checkNotNull(monitor, "monitor"); this.condition = monitor.lock.newCondition(); }
Which is the interest of using Monitor instead of Java's lock mechanism ? The main advantage could be the simplicity for several programmers. For example, if you want to use Lock's conditions, you'll probably use while loop to make thread waiting for a lock alive. With Monitor.Guard the logic is oriented to if statements, easiest to control. And who tells "easiest", tells also less error-prone code. To understand that, we'll write the same test case for the code using Lock mechanism and Monitor ones. Both examples will illustrate some producer-consumer service. First, let's test this service based on Java's lock objects:
@Test public void producerConsumerLock() { FactoryLock factory = new FactoryLock(); ConsumerLock consumer = new ConsumerLock(factory); ProducerLock producer = new ProducerLock(factory); Thread consumerThread = new Thread(consumer); Thread producerThread = new Thread(producer); consumerThread.start(); producerThread.start(); int sleepingTime = 17; try { Thread.sleep(sleepingTime*1000); } catch (Exception e) { e.printStackTrace(); fail("Sleeping caused the test failed: "+e.getMessage()); } int reallyProduced = factory.getProduced(); int reallyConsumed = factory.getConsumed(); int expectedProduced = 8; int expectedConsumed = 8; assertTrue("Test took "+ sleepingTime+ " seconds, so "+expectedProduced+" products should be produced but only "+reallyProduced+ " were", reallyProduced == expectedProduced); assertTrue("Test took "+ sleepingTime+ " seconds, so "+expectedConsumed+" products should be consumed but only "+reallyConsumed+ " were", reallyConsumed == expectedConsumed); } // And objects used for this test class FactoryLock { private Lock lock; private Condition productorCondition; private Condition consumerCondition; private boolean productAvailable = false; private int produced = 0; private int consumed = 0; public FactoryLock() { this.lock = new ReentrantLock(); this.productorCondition = lock.newCondition(); this.consumerCondition = lock.newCondition(); } public synchronized boolean isProductAvailable() { return this.productAvailable; } public Condition getProductorCondition() { return this.productorCondition; } public Condition getConsumerCondition() { return this.consumerCondition; } public Lock getLock() { return this.lock; } public void produce() { produced++; this.productAvailable = true; } public void consume() { consumed++; this.productAvailable = false; } public int getProduced() { return this.produced; } public int getConsumed() { return this.consumed; } } class ConsumerLock implements Runnable { private FactoryLock factory; public ConsumerLock(FactoryLock factory) { this.factory = factory; } @Override public void run() { while (true) { boolean locked = false; try { locked = factory.getLock().tryLock(2, TimeUnit.SECONDS); /** * Product isn't available. Consumer must to wait (await()) until getProductorCondition() sends a signal (signal()) in another thread. */ while (!factory.isProductAvailable()) { factory.getProductorCondition().await(); } Thread.sleep(1000); factory.consume(); System.out.println("Consumed, send a signal!"); factory.getConsumerCondition().signal(); } catch (Exception e) { e.printStackTrace(); } finally { if (locked) { factory.getLock().unlock(); } } } } } class ProducerLock implements Runnable { private FactoryLock factory; public ProducerLock(FactoryLock factory) { this.factory = factory; } @Override public void run() { while (true) { boolean locked = false; try { locked = factory.getLock().tryLock(2, TimeUnit.SECONDS); /** * Product is available. Producer must to wait (await()) until getConsumerCondition() sends a signal (signal()) in another thread. */ while (factory.isProductAvailable()) { factory.getConsumerCondition().await(); } // Produce the product Thread.sleep(1000); factory.produce(); System.out.println("Produced, send a signal !"); factory.getProductorCondition().signal(); } catch (Exception e) { e.printStackTrace(); } finally { if (locked) { factory.getLock().unlock(); } } } } }
And now the same thing but with Google Guava's Monitor objects:
@Test public void producerConsumerMonitor() { Monitor monitor = new Monitor(); Factory factory = new Factory(monitor); Consumer consumer = new Consumer(factory); Producer producer = new Producer(factory); Thread consumerThread = new Thread(consumer); Thread producerThread = new Thread(producer); consumerThread.start(); producerThread.start(); int sleepingTime = 17; try { Thread.sleep(sleepingTime*1000); } catch (Exception e) { e.printStackTrace(); fail("Sleeping caused the test failed: "+e.getMessage()); } int reallyProduced = factory.getProduced(); int reallyConsumed = factory.getConsumed(); int expectedProduced = 8; int expectedConsumed = 8; assertTrue("Test took "+ sleepingTime+ " seconds, so "+expectedProduced+" products should be produced but only "+reallyProduced+ " were", reallyProduced == expectedProduced); assertTrue("Test took "+ sleepingTime+ " seconds, so "+expectedConsumed+" products should be consumed but only "+reallyConsumed+ " were", reallyConsumed == expectedConsumed); } // And objects used in test method class Factory { private Monitor monitor; private boolean productAvailable = false; private Monitor.Guard productorGuard; private Monitor.Guard consumerGuard; private int produced = 0; private int consumed = 0; public Factory(Monitor monitor) { this.monitor = monitor; this.productorGuard = new Monitor.Guard(monitor) { @Override public boolean isSatisfied() { return productAvailable == false; } }; this.consumerGuard = new Monitor.Guard(monitor) { @Override public boolean isSatisfied() { return productAvailable == true; } }; } public Monitor.Guard getProductorGuard() { return this.productorGuard; } public Monitor.Guard getConsumerGuard() { return this.consumerGuard; } public Monitor getMonitor() { return this.monitor; } public void produce() { produced++; this.productAvailable = true; } public void consume() { consumed++; this.productAvailable = false; } public int getProduced() { return this.produced; } public int getConsumed() { return this.consumed; } } class Consumer implements Runnable { private Factory factory; public Consumer(Factory factory) { this.factory = factory; } @Override public void run() { while (true) { try { factory.getMonitor().enterWhen(this.factory.getConsumerGuard(), 2, TimeUnit.SECONDS); Thread.sleep(1000); System.out.println("Consumed, set productAvailable to false !"); factory.consume(); } catch (Exception e) { e.printStackTrace(); } finally { if (factory.getMonitor().isOccupiedByCurrentThread()) { factory.getMonitor().leave(); } } } } } class Producer implements Runnable { private Factory factory; public Producer(Factory factory) { this.factory = factory; } @Override public void run() { while (true) { try { factory.getMonitor().enterWhen(this.factory.getProductorGuard(), 2, TimeUnit.SECONDS); Thread.sleep(1000); System.out.println("Produced, set productAvailable to true !"); factory.produce(); } catch (Exception e) { e.printStackTrace(); } finally { if (factory.getMonitor().isOccupiedByCurrentThread()) { factory.getMonitor().leave(); } } } } }
As you can see it previous code, producer-consumer service is more readable with Google Guava. And more preciselly, it's easiest to write synchronization part with monitor guards. The same part in Java is composed by one while loop, two calls of conditions (await and signal). Google Guava use Monitor's enterWhen method to acquire a lock when concerned Monitor.Guard returns true for its isSatisfied method.
This sample code shows that Google Guava is a good complementary library to concurrency programs. It provides immutable collections which don't support writing (put, remove, putAll, clear) methods. In the second part we discovered the possibility to make some callbacks when the result of future computation is available. At the end we saw that Google Guava also defines new way to handling locks with Monitor and Monitor.Guard objects. This new way is more readable than the method based on Java's Lock and Condition objects.