Google Guava: event bus

In two previous articles we discovered key concepts and concurrency features of Google Guava. Both of them simplify the programming. Another useful thing of this library is a message dispatching system called event bus.

This article will be consacred to this system. At the first part we'll try to understand the concept and its advantages in comparison to classic Java even-listener system. In the second part we'll take a look under the hood of EventBus class. The third part will contain sample implementation of Google Guava event bus, in synchronous and asynchronous mode.

What is Google Guava's event bus ?

Event bus is a response for classical way of programming Java's publish-subscribe-style communication with events and listeners. To define well event bus, let's take an image of school bus which every morning takes pupils into school. The central point of this image is a bus which lifecycle is build by events provoked by the pupils. An example of event can be the entry to the bus. Pupils are transported by the bus from home to school. We can consider that home and school are a kind of event receivers. For example, when a pupil enters to the bus, home knows that it had one occupant less.

From the point of Google Guava, event bus is a lightweight system that can be used to event distribution program. This main advantage is the flexibility. The components don't need to know each other to be able to dispatch events. Also the way of annotate listening methods is simpler in Google Guava. You need annotate these methods with @Subscribe.

The changes were also made in the producer's side. Google Guava's producers don't need to contain a code to manage listeners (famous addListener or removeListener methods) and to dispatch events for each listener. The first point is included in EventBus class. The second one is replaced by a simple method that "post" given event to events stream.

This is a simplified version of event bus way of working. We need an EventBus instance which will hold all listeners (added with register, removed with unregister methods). These listeners must contain public methods annotated with @Subscribe which have only one argument in signature. This argument must be the instance of listened event. @Subscribe methods are invoked when appropriate event occurs, ie. when EventBus post method is called.

Before continue, take a look at given example which shows a sample event-based system with classical Java's event-listener approach:

public class EventBusJavaEventTest {

  @Test
  public void javaEventHandling() {
    Bus bus = new Bus();
    
    BusEventListener homeListener = new HomeListener();
    BusEventListener schoolListener = new SchoolListener();
    bus.addListener(homeListener);
    bus.addListener(schoolListener);
    Pupil pupil1 = new Pupil();
    Pupil pupil2 = new Pupil();
    bus.pupilEnter(pupil1);
    bus.pupilEnter(pupil2);
    bus.pupilLeave(pupil1);
    bus.pupilLeave(pupil2);
    
    assertTrue("2 children should enter to the bus but only"+homeListener.getChildrenEnteredNb()+"were", 
      homeListener.getChildrenEnteredNb() == 2);
    assertTrue("2 childre should leave the bus but only "+homeListener.getChildrenLeavedNb()+" were", 
      homeListener.getChildrenLeavedNb() == 2);
    assertTrue("2 children should enter to the bus but only"+schoolListener.getChildrenEnteredNb()+"were", 
      schoolListener.getChildrenEnteredNb() == 2);
    assertTrue("2 childre should leave the bus but only "+schoolListener.getChildrenLeavedNb()+" were", 
      schoolListener.getChildrenLeavedNb() == 2);
  }
}

class BusEnterEvent extends EventObject {

  private static final long serialVersionUID = 2529924745666603277L;

  public BusEnterEvent(Pupil source) {
    super(source);
  }
}

class BusLeaveEvent extends EventObject {

  private static final long serialVersionUID = -2993442806611298083L;

  public BusLeaveEvent(Pupil source) {
    super(source);
  }
}

abstract class BusEventListener implements EventListener {
  protected int childrenLeavedNb;
  protected int childrenEnteredNb;
  
  protected void incrementChildrenLeavedNb() {
    this.childrenLeavedNb++;
  }
  
  protected void incrementChildrenEnteredNb() {
    this.childrenEnteredNb++;
  }
  
  public int getChildrenLeavedNb() {
    return this.childrenLeavedNb;
  }
  
  public int getChildrenEnteredNb() {
    return this.childrenEnteredNb;
  }
  
  public abstract void busEnter(BusEnterEvent event);
  public abstract void busLeave(BusLeaveEvent event);
}

class HomeListener extends BusEventListener {
        
  @Override
  public void busEnter(BusEnterEvent event) {
    System.out.println("HOME: My child goes to school by bus");
    this.incrementChildrenEnteredNb();
  }
  
  @Override
  public void busLeave(BusLeaveEvent event) {
    System.out.println("HOME: My child is already at school");
    this.incrementChildrenLeavedNb();
  }
}

class SchoolListener extends BusEventListener {
        
  @Override
  public void busEnter(BusEnterEvent event) {
    System.out.println("SCHOOL: OK, new pupil will arrive soon");
    this.incrementChildrenEnteredNb();
  }

  @Override
  public void busLeave(BusLeaveEvent event) {
    System.out.println("SCHOOL: New pupil has just arrived !");
    this.incrementChildrenLeavedNb();
  }
}

class Bus {
  /**
    * Main drawback: we can only add one type of listeners. To add different types of listeners, we should make some types of detection at dispatch method, for example:
    * <pre>
    * if (listener.supports(BusEnterEvent.class)) {
    *   listener.busEnter(event);
    * }
    * </pre>
    */
  private List<BusEventListener> listeners = new ArrayList<BusEventListener>();

  public void addListener(BusEventListener listener) {
    this.listeners.add(listener);
  }

  public void pupilEnter(Pupil pupil) {
    BusEnterEvent event = new BusEnterEvent(pupil);
    for (BusEventListener listener : this.listeners) {
      listener.busEnter(event);
    }
  }

  public void pupilLeave(Pupil pupil) {
    BusLeaveEvent event = new BusLeaveEvent(pupil);
    for (BusEventListener listener : this.listeners) {
      listener.busLeave(event);
    }
  } 
}

class Pupil {     
}

Under the hood of Google Guava's EventBus

As we mentioned earlier, EventBus implements already the management of listeners (add and remove operations). It makes it through register and unregister methods:

private final ReadWriteLock subscribersByTypeLock = new ReentrantReadWriteLock();

/**
 * Strategy for finding subscriber methods in registered objects.  Currently,
 * only the {@link AnnotatedSubscriberFinder} is supported, but this is
 * encapsulated for future expansion.
 */
private final SubscriberFindingStrategy finder = new AnnotatedSubscriberFinder();

/**
 * Registers all subscriber methods on {@code object} to receive events.
 * Subscriber methods are selected and classified using this EventBus's
 * {@link SubscriberFindingStrategy}; the default strategy is the
 * {@link AnnotatedSubscriberFinder}.
 *
 * @param object  object whose subscriber methods should be registered.
 */
public void register(Object object) {
  Multimap<Class<?>, EventSubscriber> methodsInListener = finder.findAllSubscribers(object);
  subscribersByTypeLock.writeLock().lock();
  try {
    subscribersByType.putAll(methodsInListener);
  } finally {
    subscribersByTypeLock.writeLock().unlock();
  }
}

/**
 * Unregisters all subscriber methods on a registered {@code object}.
 *
 * @param object  object whose subscriber methods should be unregistered.
 * @throws IllegalArgumentException if the object was not previously registered.
 */
public void unregister(Object object) {
  Multimap<Class<?>, EventSubscriber> methodsInListener = finder.findAllSubscribers(object);
  for (Entry<Class<?>, Collection<EventSubscriber>> entry : methodsInListener.asMap().entrySet()) {
    Class<?> eventType = entry.getKey();
    Collection<EventSubscriber> eventMethodsInListener = entry.getValue();

    subscribersByTypeLock.writeLock().lock();
      try {
        Set<EventSubscriber> currentSubscribers = subscribersByType.get(eventType);
        if (!currentSubscribers.containsAll(eventMethodsInListener)) {
          throw new IllegalArgumentException(
            "missing event subscriber for an annotated method. Is " + object + " registered?");
        }
        currentSubscribers.removeAll(eventMethodsInListener);
      } finally {
        subscribersByTypeLock.writeLock().unlock();
      }
  }
}

The main role is played by an implementation of SubscriberFindingStrategy interface. It defines a single method, findAllSubscribers(Object source) which returns a map with EventSubscriber instances. Single EventSubscriber instance wrapps a method annotated with @Subscribe interface. In its handleEvent method, EventSubscriber invokes annotated method using Java's reflection. Note that method annotated with @Subscribe must have only one argument in its signature. This argument must be an instance of dispatched event. It's because handlEvent method isn't able to pass more arguments than one:

public void handleEvent(Object event) throws InvocationTargetException {
  checkNotNull(event);
  try {
    method.invoke(target, new Object[] { event });
  } catch (IllegalArgumentException e) {
    throw new Error("Method rejected target/argument: " + event, e);
  } catch (IllegalAccessException e) {
    throw new Error("Method became inaccessible: " + event, e);
  } catch (InvocationTargetException e) {
    if (e.getCause() instanceof Error) {
      throw (Error) e.getCause();
    }
    throw e;
  }
}

In register and unregister methods you can also note the presence of ReadWriteLock (field subscribersByTypeLock). Both of methods acquire a write lock of this object. Write lock is an exclusive lock. It means that they're no other objects that can acquire write or read lock for subscribersByTypeLock. If you're looking at post method, you'll understand the utility of locks in the code:

public void post(Object event) {
  Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass());

  boolean dispatched = false;
  for (Class<?> eventType : dispatchTypes) {
    subscribersByTypeLock.readLock().lock();
    try {
      Set<EventSubscriber> wrappers = subscribersByType.get(eventType);

      if (!wrappers.isEmpty()) {
        dispatched = true;
        for (EventSubscriber wrapper : wrappers) {
          enqueueEvent(event, wrapper);
        }
      }
    } finally {
      subscribersByTypeLock.readLock().unlock();
    }
  }

  if (!dispatched && !(event instanceof DeadEvent)) {
    post(new DeadEvent(this, event));
  }

  dispatchQueuedEvents();
}

As you can see, this method tries to acquire a read lock of subscribersByTypeLock. If it can't achieve this, it can mean that a new listener is registered or already existed one is removed. Thanks to this lock mechanism we can guarantee atomic state of dispatching process. We can avoid the situations when register and post methods are executed almost simoultanesly. Without lock, it could produce incosistent results, as the absence of dispatch for registered subscriber. If you want to know more about this mechanism, please consult the article about locks in Java.

Example of synchronous event bus

In the first part of the article we saw a sample implementation of classic Java's event-listener system. Now we'll see how represent the same situation with Google Guava's event bus component:

public class EventBusTest {
        
  @Test
  public void eventBus() {
    EventBus eventBus = new EventBus();

    /**
     * Thanks to this method we can add and remove listeners. 
     */
    EventBusListener homeListener = new EventBusHomeListener();
    EventBusListener schoolListener = new EventBusSchoolListener();
    eventBus.register(homeListener);
    eventBus.register(schoolListener);

    PupilBus pupil1 = new PupilBus(eventBus);
    PupilBus pupil2 = new PupilBus(eventBus);
    pupil1.enter();
    pupil2.enter();
    pupil2.goOut();
    pupil1.goOut();
    
    /**
     * We check the number of entries and outs to be sure that @Subscribe methods 
     * are invoked only for methods containing given event in the signature (so onEntry methods 
     * is called only when eventBus.post(new PupilEnterBusEvent()) is posted = 2 times 
     * because they're 2 PupilBus objects).
     */
    assertTrue("Home should see 2 entries but it sees only "+homeListener.getEntries(), 
      homeListener.getEntries() == 2);
    assertTrue("Home should see 2 outs but it sees only "+homeListener.getOuts(), 
      homeListener.getOuts() == 2);
    assertTrue("School should see 2 entries but it sees only "+schoolListener.getEntries(), 
      schoolListener.getEntries() == 2);
    assertTrue("School should see 2 outs but it sees only "+schoolListener.getOuts(), 
      schoolListener.getOuts() == 2);
  } 
}

abstract class EventBusListener {
        
  protected int entries;
  protected int outs;
  
  protected void incrementEntries() {
    this.entries++;
  }
  
  protected void incrementOuts() {
    this.outs++;
  }
  
  public int getEntries() {
    return this.entries;
  }
  
  public int getOuts() {
    return this.outs;
  }
  
  public abstract void onEnter(PupilEnterBusEvent event);
  public abstract void onLeave(PupilLeaveBusEvent event);
}

class EventBusHomeListener extends EventBusListener {

  /**
   * This method must be public and takes only one argument - listened event.
   * 
   * @param event Event send by event bus.
   */
  @Subscribe
  @Override
  public void onEnter(PupilEnterBusEvent event) {
    System.out.println("Pupil enters to the bus !"+event);
    this.incrementEntries();
  }

  @Subscribe
  @Override
  public void onLeave(PupilLeaveBusEvent event) {
    System.out.println("Pupil goes out from bus !");
    this.incrementOuts();
  } 
}

class EventBusSchoolListener extends EventBusListener {
  @Subscribe
  @Override
  public void onEnter(PupilEnterBusEvent event) {
    System.out.println("Pupil enters to the bus !"+event);
    this.incrementEntries();
  }

  @Subscribe
  @Override
  public void onLeave(PupilLeaveBusEvent event) {
    System.out.println("Pupil goes out from bus !");
    this.incrementOuts();
  } 
}

class PupilBus {
        
  private EventBus eventBus;
  
  public PupilBus(EventBus eventBus) {
    this.eventBus = eventBus;
  }

  /**
   * We need to simply "post" an event. Event bus will do the rest, ie. dispatch submitted 
   * event to appropriated listeners. Note that pupil doesn't know which objects are listening for 
   * posted event (BusLeaveEvent). It knows only that some object (EventBus in occurrence) 
   * will make something with posted event.
   * 
   * So, we don't need anymore to implement a kind of container object (as Bus in previous sample) which 
   * holds all available listeners and dispatches given event to corresponding listeners.
   */
  public void enter() {
    this.eventBus.post(new PupilEnterBusEvent());
  }
  
  public void goOut() {
    this.eventBus.post(new PupilLeaveBusEvent());
  } 
}

class PupilEnterBusEvent { 
}

class PupilLeaveBusEvent { 
}

As you can see, we can only focus on event and listener definitions. The rest of job is made transparently by EventBus instance. We don't need anymore to define listeners handling (add, remove) or events handling.

Example of asynchronous event bus

However, note that EventBus dispatches events serially. It's way event handling methods (as onEnter or onLeave in our example) should be lightweight and shouldn't make long operations. For heavier processing, you should use asynchronous version of event bus, represented by AsyncEventBus. Its functionalities are the same as for EventBus. The only difference is that AsyncEventBus will dispatch events as new tasks of java.util.concurrent.Executor implementation:

private final Executor executor;

@Override
void dispatch(final Object event, final EventSubscriber subscriber) {
  checkNotNull(event);
  checkNotNull(subscriber);
  executor.execute(
    new Runnable() {
      @Override
      public void run() {
      AsyncEventBus.super.dispatch(event, subscriber);
    }
  });
}

We'll analyze that with the EventBus example from the 3rd part. We'll only replace EventBus implementation and add new object to check in which threads were executed dispatched events:

@Test
public void eventBus() {
  /**
    * For illustration purposes only we use cached thread pool which will create thread when needed and reuse threads if available.
    */
  EventBus eventBus = new AsyncEventBus(Executors.newCachedThreadPool());
  // ... the code remains the same until pupil1.goOut(); where we add following lines
  try {
    Thread.sleep(2000);
  } catch (Exception e) {
    e.printStackTrace();
  }
  assertTrue("Home should see 2 entries but it sees only "+homeListener.getEntries(), 
    homeListener.getEntries() == 2);
  assertTrue("Home should see 2 outs but it sees only "+homeListener.getOuts(), 
    homeListener.getOuts() == 2);
  assertTrue("School should see 2 entries but it sees only "+schoolListener.getEntries(), 
    schoolListener.getEntries() == 2);
  assertTrue("School should see 2 outs but it sees only "+schoolListener.getOuts(), 
    schoolListener.getOuts() == 2);
  
  String currThreadName = Thread.currentThread().getName();
  for (String name : homeListener.getThreadNames()) {
    assertFalse("Event in AsyncBusEvent shouldn't be executed at the same thread as main thread", 
      name.equals(currThreadName));
  }
  for (String name : schoolListener.getThreadNames()) {
    assertFalse("Event in AsyncBusEvent shouldn't be executed at the same thread as main thread", 
      name.equals(currThreadName));
  }
}  

// in additionnaly, we override listeners to be able to check thread names in test case
abstract class EventBusListener {
  protected List<String> threadNames = new ArrayList<String>();
  protected int entries;
  protected int outs;
  
  protected void incrementEntries() {
    this.entries++;
  }
  
  protected void incrementOuts() {
    this.outs++;
  }
  
  public int getEntries() {
    return this.entries;
  }
  
  public int getOuts() {
    return this.outs;
  }
  
  public List<String> getThreadNames() {
    return this.threadNames;
  }
  
  public abstract void onEnter(PupilEnterBusEvent event);
  public abstract void onLeave(PupilLeaveBusEvent event);
}

class EventBusHomeListener extends EventBusListener {

  /**
    * This method must be public and takes only one argument - listened event.
    * 
    * @param event Event send by event bus.
    */
  @Subscribe
  @Override
  public void onEnter(PupilEnterBusEvent event) {
    System.out.println("Pupil enters to the bus !");
    this.incrementEntries();
    this.threadNames.add(Thread.currentThread().getName());
  }

  @Subscribe
  @Override
  public void onLeave(PupilLeaveBusEvent event) {
    System.out.println("Pupil goes out from bus !");
    this.incrementOuts();
    this.threadNames.add(Thread.currentThread().getName());
  } 
}


class EventBusSchoolListener extends EventBusListener {
  @Subscribe
  @Override
  public void onEnter(PupilEnterBusEvent event) {
    System.out.println("Pupil enters to the bus !");
    this.incrementEntries();
    this.threadNames.add(Thread.currentThread().getName());
  }

  @Subscribe
  @Override
  public void onLeave(PupilLeaveBusEvent event) {
    System.out.println("Pupil goes out from bus !");
    this.incrementOuts();
    this.threadNames.add(Thread.currentThread().getName());
  } 
}

This test, exactly as previous one, should pass correctly. Events are executed in threads created by cached thread pool, initialized in AsyncEventBus constructor.

In this article we discovered the concept of event bus, publish-subscriber system which can be a real alternative to Java's classic event-listener schema. It's a real alternative because we don't need to implement some of features as listeners management or events handling. Instead of that, we need only to initialize EventBus instance (appropriated to tasks treated by listeners), annotate some methods with @Subscribe, register listeners and invoke event dispatch through EventBus post method. Through some of examples we saw that Google Guava's event bus is simpler to implement that Java's event-listener standard mechanism.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!