Asynchronous operations in Apache ZooKeeper

Sometimes network latencies can slow down the communication between Apache ZooKeeper and its client. It's one of the reasons of possible use of asynchronous operations for zNodes manipulations.

This article describes some of non-blocking operations in Apache ZooKeeper Java API. In the first part, it covers some under-the-hood techniques implemented to deal with asynchronous treatments. The second part shows with appropriated test cases how these asynchronous operations can be used.

Asynchronous calls in Apache ZooKeeper Java API

In Java API, asynchronous calls can be distinguished by the presence of callback parameter in synchronous methods. So for example, synchronous format to create a zNode is defined as create(String, byte[], List<ACL>,CreateMode) and its corresponding asynchronous version looks like create(String, byte[], List<ACL>,CreateMode, StringCallback, Object).

The difference between synchronous and asynchronous calls is that asynchronous doesn't return nothing. It's because under-the-hood the operation is put in a queue and waits to be executed. One of possible ways to retrieve the result of the operations is the use of callbacks. Because synchronous zNode creation method returns a path to created zNode, its asynchronous version has a possibility to attach an implementation of org.apache.zookeeper.StringCallback to retrieve this path. StringCallback is not a single one callback allowed. There are some other callbacks, each one adapted to executed asynchronous action: StatCallback when getting zNode stats, DataCallback when retrieving data, ACLCallback for ACL-related operations, ChildrenCallback when working with zNode having children or MultiCallback useful when bulk operations are executed. All of them extend AsyncCallback interface.

During queueing, request is transformed to an instance of Packet class containing all request information (callback, headers, context and so on). Once queued, the instance of this class is sent to server by sending thread, represented by org.apache.zookeeper.ClientCnxn.SendThread class. By the way, this class is also responsible to generate heart beats.

Example of asynchronous calls in Apache ZooKeeper Java API

Now when we discovered how it should work, we can write some tests:

@Test
public void should_create_znode_asynchronously_and_call_callback() throws KeeperException, InterruptedException {
  CountDownLatch latch = new CountDownLatch(1);
  String contextObject = "Context -> string";
  int[] code = new int[1];
  String[] contextArray = new String[1];
  zooKeeper.create("/home_to_delete", "Home directory".getBytes(), 
    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
    (resultCode, path, context, name) -> {
      code[0] = resultCode;
      contextArray[0] = ""+context;
      System.out.println("rc "+resultCode + " path "+path + " ctx "+context + " name "+name);
      latch.countDown();
    }, contextObject);
  latch.await(5, TimeUnit.SECONDS);

  assertThat(code[0]).isEqualTo(KeeperException.Code.OK.intValue());
  assertThat(contextArray[0]).isEqualTo(contextObject);
}

@Test
public void should_call_bulk_operation_with_callback() throws KeeperException, InterruptedException {
  zooKeeper.create(NODE_2, "Content".getBytes(), 
    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  zooKeeper.create(NODE_3, "Data set".getBytes(), 
    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  Op createNode1 = Op.create(NODE_1, "content".getBytes(), 
    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  Op removeNode2 = Op.delete(NODE_2, ALL_VERSIONS);
  Op setDataNode3 = Op.setData(NODE_3, "Other node data".getBytes(), ALL_VERSIONS);

  List<OpResult> resultsFromCallback = new ArrayList<>();
  CountDownLatch latch = new CountDownLatch(1);
  zooKeeper.multi(Lists.newArrayList(createNode1, removeNode2, setDataNode3),
  (returnCode, path, context, opResults) -> {
      resultsFromCallback.addAll(opResults);
      latch.countDown();
  }, null);

  latch.await(3, TimeUnit.SECONDS);

  assertThat(resultsFromCallback).hasSize(3);
  assertThat(resultsFromCallback).extracting("type")
    .containsOnly(ZooDefs.OpCode.create, ZooDefs.OpCode.delete, ZooDefs.OpCode.setData);
  assertThat(resultsFromCallback.stream().map(result -> result.getClass().getCanonicalName()).collect(Collectors.toList())).containsOnly(
    "org.apache.zookeeper.OpResult.CreateResult", "org.apache.zookeeper.OpResult.DeleteResult",
    "org.apache.zookeeper.OpResult.SetDataResult"
  );
  assertThat(zooKeeper.exists(NODE_1, false)).isNotNull();
  assertThat(zooKeeper.exists(NODE_2, false)).isNull();
  assertThat(new String(zooKeeper.getData(NODE_3, false, new Stat()))).isEqualTo("Other node data");
}

The article presents asynchronous part of Apache ZooKeeper operations. As we can see, they are put inside a queue and are waiting to be sent to server by special sending thread. The second part shows asynchronous operations in Apache ZooKeeper Java API.

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!