Watches in Apache ZooKeeper

A lot of programming tools implement event-driven approach. Apache ZooKeeper isn't an exception for this rule with its system of watchers.

New ebook 🔥

Learn 84 ways to solve common data engineering problems with cloud services.

👉 I want my Early Access edition

This article shows some ways to listen what happens inside ZooKeeper. The first part describes that more theoretically. The second part shows the real use of ZooKeeper listeners in Java API.

Apache ZooKeeper watchers

Listeners in Apache ZooKeeper are called watchers. They are one-time triggered methods when given zNode changes or is removed. One-time means that the watchers are called only once. After receiving a notification, they are destroyed. Events are received in asynchronous way.

We can distinguish different types of watchers. The first type applies on data held by given zNode. It can be registered through data-related methods, as getData(String, Watcher, Stat) or exists(String, Watcher). In the other side we can find watcher related to zNodes storing other zNodes. This kind of watcher can be defined when getChildren(String, Watcher) method is called. There are also another watcher we can use - connection watcher. It's registered during connection initialization.

In Java API, watchers are represented by org.apache.zookeeper.Watcher interface which has only one method to implement - process(WatchedEvent). It received an event composed by 3 fields: state of ZooKeeper connection, type of the event (delete, create etc.) and path concerned by the event.

Example of watchers in Apache ZooKeeper Java API

After discovering watchers, it's a time to start to use them in JUnit tests:

@Test
public void should_prove_that_watches_are_one_time_events() throws KeeperException, InterruptedException {
  int[] calledEvents = new int[] {0};
  Watcher.Event.EventType[] types = new Watcher.Event.EventType[1];
  CountDownLatch latch = new CountDownLatch(2);
  zooKeeper.exists(NODE_1, 
    new CountingWatcher(calledEvents, types, latch));

  zooKeeper.create(NODE_1, new byte[0], 
    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  zooKeeper.delete(NODE_1, ALL_VERSIONS);
  zooKeeper.create(NODE_1, new byte[0], 
    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  latch.await(3, TimeUnit.SECONDS);

  assertThat(calledEvents[0]).isEqualTo(1);
  assertThat(types[0]).isEqualTo(Watcher.Event.EventType.NodeCreated);
}

@Test
public void should_trigger_watch_on_node_delete() throws KeeperException, InterruptedException {
  int[] calledEvents = new int[] {0};
  Watcher.Event.EventType[] types = new Watcher.Event.EventType[1];
  zooKeeper.create(NODE_1, new byte[0], 
    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  CountDownLatch latch = new CountDownLatch(1);
   /**
    * exists() method allows to trigger the most universal watcher. It applies to all
    * zNode operations: creation, removal or update. The only one not concerned operation
    * is about children - to register watcher for children nodes manipulation, we must use
    * getChildren().
    */
  zooKeeper.exists(NODE_1, 
    new CountingWatcher(calledEvents, types, latch));

  zooKeeper.delete(NODE_1, ALL_VERSIONS);
  latch.await(3, TimeUnit.SECONDS);

  assertThat(calledEvents[0]).isEqualTo(1);
  assertThat(types[0]).isEqualTo(Watcher.Event.EventType.NodeDeleted);
}

@Test
public void should_trigger_watch_on_children() throws KeeperException, InterruptedException {
  int[] calledEvents = new int[] {0};
  CountDownLatch latch = new CountDownLatch(1);
  Watcher.Event.EventType[] types = new Watcher.Event.EventType[1];
  zooKeeper.create(PARENT_NODE, new byte[0], 
    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

  zooKeeper.getChildren(PARENT_NODE, 
    new CountingWatcher(calledEvents, types, latch));
  zooKeeper.create(CHILD_NODE, new byte[0], 
    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  latch.await(3, TimeUnit.SECONDS);

  assertThat(calledEvents[0]).isEqualTo(1);
  assertThat(types[0]).isEqualTo(Watcher.Event.EventType.NodeChildrenChanged);
}

@Test
public void should_trigger_watch_on_different_session() throws KeeperException, InterruptedException {
  int[] calledEvents = new int[] {0, 0};
  Watcher.Event.EventType[] types = new Watcher.Event.EventType[2];
  zooKeeper.create(DIFFERENT_THREAD_NODE, new byte[0], 
    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  CountDownLatch latch = new CountDownLatch(1);
  new Thread(() -> {
    ZooKeeper newZooKeeper = null;
    try {
      newZooKeeper = new ZooKeeper("127.0.0.1:2181", CONNECTION_TIMEOUT,
        (event) -> System.out.println("Processing event " + event));
      while (newZooKeeper.getState() == ZooKeeper.States.CONNECTING) {
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
    try {
      newZooKeeper.exists(DIFFERENT_THREAD_NODE, 
        new CountingWatcher(calledEvents, types, latch));
    } catch (KeeperException | InterruptedException e) {
      e.printStackTrace();
    }
  }).start();

  Thread.sleep(2000);
  zooKeeper.delete(DIFFERENT_THREAD_NODE, ALL_VERSIONS);
  latch.await(5, TimeUnit.SECONDS);

  assertThat(calledEvents[0]).isEqualTo(1);
  assertThat(types[0]).isEqualTo(Watcher.Event.EventType.NodeDeleted);
}

@Test
public void should_trigger_two_watches() throws KeeperException, InterruptedException {
  int[] calledEvents = new int[] {0};
  Watcher.Event.EventType[] types = new Watcher.Event.EventType[1];
  int[] calledEventsChildren = new int[] {0};
  Watcher.Event.EventType[] typesChildren = new Watcher.Event.EventType[1];
  zooKeeper.create(DIFFERENT_THREAD_NODE, new byte[0], 
    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  CountDownLatch latch = new CountDownLatch(1);
  zooKeeper.exists(DIFFERENT_THREAD_NODE, new CountingWatcher(calledEvents, types, latch));
  zooKeeper.exists(DIFFERENT_THREAD_NODE, new CountingWatcher(calledEventsChildren, typesChildren, latch));

  Thread.sleep(2000);
  zooKeeper.delete(DIFFERENT_THREAD_NODE, ALL_VERSIONS);
  latch.await(5, TimeUnit.SECONDS);

  assertThat(calledEvents[0]).isEqualTo(1);
  assertThat(types[0]).isEqualTo(Watcher.Event.EventType.NodeDeleted);
  assertThat(calledEventsChildren[0]).isEqualTo(1);
  assertThat(typesChildren[0]).isEqualTo(Watcher.Event.EventType.NodeDeleted); 
}

private static final class CountingWatcher implements Watcher {

  private int[] calledEvents;
  private Event.EventType[] types;
  private CountDownLatch latch;

  private CountingWatcher(int[] calledEvents, Event.EventType[] types, CountDownLatch latch) {
    this.calledEvents = calledEvents;
    this.types = types;
    this.latch = latch;
  }

  @Override
  public void process(WatchedEvent event) {
    System.out.println("Processing event "+event.getPath()+" of type "+event.getType());
    calledEvents[0] = calledEvents[0]+1;
    types[0] = event.getType();
    latch.countDown();
  }
}

The article shows some basic uses of Apache ZooKeeper watchers. As we can see through the first part, watchers are quite simple, one-time triggered listeners to any operation related to zNode. The second part shows several scenarios of possible use of watchers.