A lot of programming tools implement event-driven approach. Apache ZooKeeper isn't an exception for this rule with its system of watchers.

Looking for a better data engineering position and skills?
You have been working as a data engineer but feel stuck? You don't have any new challenges and are still writing the same jobs all over again? You have now different options. You can try to look for a new job, now or later, or learn from the others! "Become a Better Data Engineer" initiative is one of these places where you can find online learning resources where the theory meets the practice. They will help you prepare maybe for the next job, or at least, improve your current skillset without looking for something else.
๐ I'm interested in improving my data engineering skillset
This article shows some ways to listen what happens inside ZooKeeper. The first part describes that more theoretically. The second part shows the real use of ZooKeeper listeners in Java API.
Apache ZooKeeper watchers
Listeners in Apache ZooKeeper are called watchers. They are one-time triggered methods when given zNode changes or is removed. One-time means that the watchers are called only once. After receiving a notification, they are destroyed. Events are received in asynchronous way.
We can distinguish different types of watchers. The first type applies on data held by given zNode. It can be registered through data-related methods, as getData(String, Watcher, Stat) or exists(String, Watcher). In the other side we can find watcher related to zNodes storing other zNodes. This kind of watcher can be defined when getChildren(String, Watcher) method is called. There are also another watcher we can use - connection watcher. It's registered during connection initialization.
In Java API, watchers are represented by org.apache.zookeeper.Watcher interface which has only one method to implement - process(WatchedEvent). It received an event composed by 3 fields: state of ZooKeeper connection, type of the event (delete, create etc.) and path concerned by the event.
Example of watchers in Apache ZooKeeper Java API
After discovering watchers, it's a time to start to use them in JUnit tests:
@Test public void should_prove_that_watches_are_one_time_events() throws KeeperException, InterruptedException { int[] calledEvents = new int[] {0}; Watcher.Event.EventType[] types = new Watcher.Event.EventType[1]; CountDownLatch latch = new CountDownLatch(2); zooKeeper.exists(NODE_1, new CountingWatcher(calledEvents, types, latch)); zooKeeper.create(NODE_1, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zooKeeper.delete(NODE_1, ALL_VERSIONS); zooKeeper.create(NODE_1, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); latch.await(3, TimeUnit.SECONDS); assertThat(calledEvents[0]).isEqualTo(1); assertThat(types[0]).isEqualTo(Watcher.Event.EventType.NodeCreated); } @Test public void should_trigger_watch_on_node_delete() throws KeeperException, InterruptedException { int[] calledEvents = new int[] {0}; Watcher.Event.EventType[] types = new Watcher.Event.EventType[1]; zooKeeper.create(NODE_1, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); CountDownLatch latch = new CountDownLatch(1); /** * exists() method allows to trigger the most universal watcher. It applies to all * zNode operations: creation, removal or update. The only one not concerned operation * is about children - to register watcher for children nodes manipulation, we must use * getChildren(). */ zooKeeper.exists(NODE_1, new CountingWatcher(calledEvents, types, latch)); zooKeeper.delete(NODE_1, ALL_VERSIONS); latch.await(3, TimeUnit.SECONDS); assertThat(calledEvents[0]).isEqualTo(1); assertThat(types[0]).isEqualTo(Watcher.Event.EventType.NodeDeleted); } @Test public void should_trigger_watch_on_children() throws KeeperException, InterruptedException { int[] calledEvents = new int[] {0}; CountDownLatch latch = new CountDownLatch(1); Watcher.Event.EventType[] types = new Watcher.Event.EventType[1]; zooKeeper.create(PARENT_NODE, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zooKeeper.getChildren(PARENT_NODE, new CountingWatcher(calledEvents, types, latch)); zooKeeper.create(CHILD_NODE, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); latch.await(3, TimeUnit.SECONDS); assertThat(calledEvents[0]).isEqualTo(1); assertThat(types[0]).isEqualTo(Watcher.Event.EventType.NodeChildrenChanged); } @Test public void should_trigger_watch_on_different_session() throws KeeperException, InterruptedException { int[] calledEvents = new int[] {0, 0}; Watcher.Event.EventType[] types = new Watcher.Event.EventType[2]; zooKeeper.create(DIFFERENT_THREAD_NODE, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { ZooKeeper newZooKeeper = null; try { newZooKeeper = new ZooKeeper("127.0.0.1:2181", CONNECTION_TIMEOUT, (event) -> System.out.println("Processing event " + event)); while (newZooKeeper.getState() == ZooKeeper.States.CONNECTING) { } } catch (IOException e) { e.printStackTrace(); } try { newZooKeeper.exists(DIFFERENT_THREAD_NODE, new CountingWatcher(calledEvents, types, latch)); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } }).start(); Thread.sleep(2000); zooKeeper.delete(DIFFERENT_THREAD_NODE, ALL_VERSIONS); latch.await(5, TimeUnit.SECONDS); assertThat(calledEvents[0]).isEqualTo(1); assertThat(types[0]).isEqualTo(Watcher.Event.EventType.NodeDeleted); } @Test public void should_trigger_two_watches() throws KeeperException, InterruptedException { int[] calledEvents = new int[] {0}; Watcher.Event.EventType[] types = new Watcher.Event.EventType[1]; int[] calledEventsChildren = new int[] {0}; Watcher.Event.EventType[] typesChildren = new Watcher.Event.EventType[1]; zooKeeper.create(DIFFERENT_THREAD_NODE, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); CountDownLatch latch = new CountDownLatch(1); zooKeeper.exists(DIFFERENT_THREAD_NODE, new CountingWatcher(calledEvents, types, latch)); zooKeeper.exists(DIFFERENT_THREAD_NODE, new CountingWatcher(calledEventsChildren, typesChildren, latch)); Thread.sleep(2000); zooKeeper.delete(DIFFERENT_THREAD_NODE, ALL_VERSIONS); latch.await(5, TimeUnit.SECONDS); assertThat(calledEvents[0]).isEqualTo(1); assertThat(types[0]).isEqualTo(Watcher.Event.EventType.NodeDeleted); assertThat(calledEventsChildren[0]).isEqualTo(1); assertThat(typesChildren[0]).isEqualTo(Watcher.Event.EventType.NodeDeleted); } private static final class CountingWatcher implements Watcher { private int[] calledEvents; private Event.EventType[] types; private CountDownLatch latch; private CountingWatcher(int[] calledEvents, Event.EventType[] types, CountDownLatch latch) { this.calledEvents = calledEvents; this.types = types; this.latch = latch; } @Override public void process(WatchedEvent event) { System.out.println("Processing event "+event.getPath()+" of type "+event.getType()); calledEvents[0] = calledEvents[0]+1; types[0] = event.getType(); latch.countDown(); } }
The article shows some basic uses of Apache ZooKeeper watchers. As we can see through the first part, watchers are quite simple, one-time triggered listeners to any operation related to zNode. The second part shows several scenarios of possible use of watchers.