A lot of programming tools implement event-driven approach. Apache ZooKeeper isn't an exception for this rule with its system of watchers.
A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free
👉 Register here
This article shows some ways to listen what happens inside ZooKeeper. The first part describes that more theoretically. The second part shows the real use of ZooKeeper listeners in Java API.
Apache ZooKeeper watchers
Listeners in Apache ZooKeeper are called watchers. They are one-time triggered methods when given zNode changes or is removed. One-time means that the watchers are called only once. After receiving a notification, they are destroyed. Events are received in asynchronous way.
We can distinguish different types of watchers. The first type applies on data held by given zNode. It can be registered through data-related methods, as getData(String, Watcher, Stat) or exists(String, Watcher). In the other side we can find watcher related to zNodes storing other zNodes. This kind of watcher can be defined when getChildren(String, Watcher) method is called. There are also another watcher we can use - connection watcher. It's registered during connection initialization.
In Java API, watchers are represented by org.apache.zookeeper.Watcher interface which has only one method to implement - process(WatchedEvent). It received an event composed by 3 fields: state of ZooKeeper connection, type of the event (delete, create etc.) and path concerned by the event.
Example of watchers in Apache ZooKeeper Java API
After discovering watchers, it's a time to start to use them in JUnit tests:
@Test public void should_prove_that_watches_are_one_time_events() throws KeeperException, InterruptedException { int[] calledEvents = new int[] {0}; Watcher.Event.EventType[] types = new Watcher.Event.EventType[1]; CountDownLatch latch = new CountDownLatch(2); zooKeeper.exists(NODE_1, new CountingWatcher(calledEvents, types, latch)); zooKeeper.create(NODE_1, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zooKeeper.delete(NODE_1, ALL_VERSIONS); zooKeeper.create(NODE_1, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); latch.await(3, TimeUnit.SECONDS); assertThat(calledEvents[0]).isEqualTo(1); assertThat(types[0]).isEqualTo(Watcher.Event.EventType.NodeCreated); } @Test public void should_trigger_watch_on_node_delete() throws KeeperException, InterruptedException { int[] calledEvents = new int[] {0}; Watcher.Event.EventType[] types = new Watcher.Event.EventType[1]; zooKeeper.create(NODE_1, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); CountDownLatch latch = new CountDownLatch(1); /** * exists() method allows to trigger the most universal watcher. It applies to all * zNode operations: creation, removal or update. The only one not concerned operation * is about children - to register watcher for children nodes manipulation, we must use * getChildren(). */ zooKeeper.exists(NODE_1, new CountingWatcher(calledEvents, types, latch)); zooKeeper.delete(NODE_1, ALL_VERSIONS); latch.await(3, TimeUnit.SECONDS); assertThat(calledEvents[0]).isEqualTo(1); assertThat(types[0]).isEqualTo(Watcher.Event.EventType.NodeDeleted); } @Test public void should_trigger_watch_on_children() throws KeeperException, InterruptedException { int[] calledEvents = new int[] {0}; CountDownLatch latch = new CountDownLatch(1); Watcher.Event.EventType[] types = new Watcher.Event.EventType[1]; zooKeeper.create(PARENT_NODE, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zooKeeper.getChildren(PARENT_NODE, new CountingWatcher(calledEvents, types, latch)); zooKeeper.create(CHILD_NODE, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); latch.await(3, TimeUnit.SECONDS); assertThat(calledEvents[0]).isEqualTo(1); assertThat(types[0]).isEqualTo(Watcher.Event.EventType.NodeChildrenChanged); } @Test public void should_trigger_watch_on_different_session() throws KeeperException, InterruptedException { int[] calledEvents = new int[] {0, 0}; Watcher.Event.EventType[] types = new Watcher.Event.EventType[2]; zooKeeper.create(DIFFERENT_THREAD_NODE, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { ZooKeeper newZooKeeper = null; try { newZooKeeper = new ZooKeeper("127.0.0.1:2181", CONNECTION_TIMEOUT, (event) -> System.out.println("Processing event " + event)); while (newZooKeeper.getState() == ZooKeeper.States.CONNECTING) { } } catch (IOException e) { e.printStackTrace(); } try { newZooKeeper.exists(DIFFERENT_THREAD_NODE, new CountingWatcher(calledEvents, types, latch)); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } }).start(); Thread.sleep(2000); zooKeeper.delete(DIFFERENT_THREAD_NODE, ALL_VERSIONS); latch.await(5, TimeUnit.SECONDS); assertThat(calledEvents[0]).isEqualTo(1); assertThat(types[0]).isEqualTo(Watcher.Event.EventType.NodeDeleted); } @Test public void should_trigger_two_watches() throws KeeperException, InterruptedException { int[] calledEvents = new int[] {0}; Watcher.Event.EventType[] types = new Watcher.Event.EventType[1]; int[] calledEventsChildren = new int[] {0}; Watcher.Event.EventType[] typesChildren = new Watcher.Event.EventType[1]; zooKeeper.create(DIFFERENT_THREAD_NODE, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); CountDownLatch latch = new CountDownLatch(1); zooKeeper.exists(DIFFERENT_THREAD_NODE, new CountingWatcher(calledEvents, types, latch)); zooKeeper.exists(DIFFERENT_THREAD_NODE, new CountingWatcher(calledEventsChildren, typesChildren, latch)); Thread.sleep(2000); zooKeeper.delete(DIFFERENT_THREAD_NODE, ALL_VERSIONS); latch.await(5, TimeUnit.SECONDS); assertThat(calledEvents[0]).isEqualTo(1); assertThat(types[0]).isEqualTo(Watcher.Event.EventType.NodeDeleted); assertThat(calledEventsChildren[0]).isEqualTo(1); assertThat(typesChildren[0]).isEqualTo(Watcher.Event.EventType.NodeDeleted); } private static final class CountingWatcher implements Watcher { private int[] calledEvents; private Event.EventType[] types; private CountDownLatch latch; private CountingWatcher(int[] calledEvents, Event.EventType[] types, CountDownLatch latch) { this.calledEvents = calledEvents; this.types = types; this.latch = latch; } @Override public void process(WatchedEvent event) { System.out.println("Processing event "+event.getPath()+" of type "+event.getType()); calledEvents[0] = calledEvents[0]+1; types[0] = event.getType(); latch.countDown(); } }
The article shows some basic uses of Apache ZooKeeper watchers. As we can see through the first part, watchers are quite simple, one-time triggered listeners to any operation related to zNode. The second part shows several scenarios of possible use of watchers.