Sometimes network latencies can slow down the communication between Apache ZooKeeper and its client. It's one of the reasons of possible use of asynchronous operations for zNodes manipulations.
A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free
👉 Register here
This article describes some of non-blocking operations in Apache ZooKeeper Java API. In the first part, it covers some under-the-hood techniques implemented to deal with asynchronous treatments. The second part shows with appropriated test cases how these asynchronous operations can be used.
Asynchronous calls in Apache ZooKeeper Java API
In Java API, asynchronous calls can be distinguished by the presence of callback parameter in synchronous methods. So for example, synchronous format to create a zNode is defined as create(String, byte[], List<ACL>,CreateMode) and its corresponding asynchronous version looks like create(String, byte[], List<ACL>,CreateMode, StringCallback, Object).
The difference between synchronous and asynchronous calls is that asynchronous doesn't return nothing. It's because under-the-hood the operation is put in a queue and waits to be executed. One of possible ways to retrieve the result of the operations is the use of callbacks. Because synchronous zNode creation method returns a path to created zNode, its asynchronous version has a possibility to attach an implementation of org.apache.zookeeper.StringCallback to retrieve this path. StringCallback is not a single one callback allowed. There are some other callbacks, each one adapted to executed asynchronous action: StatCallback when getting zNode stats, DataCallback when retrieving data, ACLCallback for ACL-related operations, ChildrenCallback when working with zNode having children or MultiCallback useful when bulk operations are executed. All of them extend AsyncCallback interface.
During queueing, request is transformed to an instance of Packet class containing all request information (callback, headers, context and so on). Once queued, the instance of this class is sent to server by sending thread, represented by org.apache.zookeeper.ClientCnxn.SendThread class. By the way, this class is also responsible to generate heart beats.
Example of asynchronous calls in Apache ZooKeeper Java API
Now when we discovered how it should work, we can write some tests:
@Test public void should_create_znode_asynchronously_and_call_callback() throws KeeperException, InterruptedException { CountDownLatch latch = new CountDownLatch(1); String contextObject = "Context -> string"; int[] code = new int[1]; String[] contextArray = new String[1]; zooKeeper.create("/home_to_delete", "Home directory".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, (resultCode, path, context, name) -> { code[0] = resultCode; contextArray[0] = ""+context; System.out.println("rc "+resultCode + " path "+path + " ctx "+context + " name "+name); latch.countDown(); }, contextObject); latch.await(5, TimeUnit.SECONDS); assertThat(code[0]).isEqualTo(KeeperException.Code.OK.intValue()); assertThat(contextArray[0]).isEqualTo(contextObject); } @Test public void should_call_bulk_operation_with_callback() throws KeeperException, InterruptedException { zooKeeper.create(NODE_2, "Content".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zooKeeper.create(NODE_3, "Data set".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); Op createNode1 = Op.create(NODE_1, "content".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); Op removeNode2 = Op.delete(NODE_2, ALL_VERSIONS); Op setDataNode3 = Op.setData(NODE_3, "Other node data".getBytes(), ALL_VERSIONS); List<OpResult> resultsFromCallback = new ArrayList<>(); CountDownLatch latch = new CountDownLatch(1); zooKeeper.multi(Lists.newArrayList(createNode1, removeNode2, setDataNode3), (returnCode, path, context, opResults) -> { resultsFromCallback.addAll(opResults); latch.countDown(); }, null); latch.await(3, TimeUnit.SECONDS); assertThat(resultsFromCallback).hasSize(3); assertThat(resultsFromCallback).extracting("type") .containsOnly(ZooDefs.OpCode.create, ZooDefs.OpCode.delete, ZooDefs.OpCode.setData); assertThat(resultsFromCallback.stream().map(result -> result.getClass().getCanonicalName()).collect(Collectors.toList())).containsOnly( "org.apache.zookeeper.OpResult.CreateResult", "org.apache.zookeeper.OpResult.DeleteResult", "org.apache.zookeeper.OpResult.SetDataResult" ); assertThat(zooKeeper.exists(NODE_1, false)).isNotNull(); assertThat(zooKeeper.exists(NODE_2, false)).isNull(); assertThat(new String(zooKeeper.getData(NODE_3, false, new Stat()))).isEqualTo("Other node data"); }
The article presents asynchronous part of Apache ZooKeeper operations. As we can see, they are put inside a queue and are waiting to be sent to server by special sending thread. The second part shows asynchronous operations in Apache ZooKeeper Java API.