Client connects to ZooKeeper server and maintains a session. There are several things to know about ZooKeeper sessions and we'll explore them in this article.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
At the begin, we'll focus on aspects to take into account when dealing with sessions (timeout, initialization). The second part will show session management through Java API.
Sessions in Apache ZooKeeper
By connecting to Apache ZooKeeper, client initializes a session. Each session contains its own id and password. Both can be used in the case of reconnection. Reconnection is managed by the client when server reboots. Thanks to session restoring, client keeps the same context that before the communication failure. In consequence, ephemeral zNodes remain accessible.
Connection is identified by states. Besides reconnection, which is an exceptional workflow, each time when a connection between server and client is established, it begins with CONNECTING state. Basically, when session is in this state, client must wait before be able to work on zNodes. When connection succeeds, it passes to CONNECTED state. Session can also expire (triggers SESSION_EXPIRED) or be lost (CONNECTION_LOST event).
Each session has associated timeout value. The server uses this value to check if client is still alive. In ZooKeeper vocabulary, server checks client's heartbeats. If client doesn't reply to the server, it's considered as dead and is disconnected. Heartbeats activity can be observed in server logs:
20:03:01,612 [myid:] - DEBUG [SyncThread:0:FinalRequestProcessor@88] - Processing request:: sessionid:0x153c7652a290004 type:exists cxid:0x2 zxid:0xfffffffffffffffe txntype:unknown reqpath:/tmp_ephemeral 20:03:01,612 [myid:] - DEBUG [SyncThread:0:FinalRequestProcessor@160] - sessionid:0x153c7652a290004 type:exists cxid:0x2 zxid:0xfffffffffffffffe txntype:unknown reqpath:/tmp_ephemeral 20:03:22,969 [myid:] - DEBUG [SyncThread:0:FinalRequestProcessor@88] - Processing request:: sessionid:0x153c7652a290004 type:ping cxid:0xfffffffffffffffe zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a 20:03:22,969 [myid:] - DEBUG [SyncThread:0:FinalRequestProcessor@160] - sessionid:0x153c7652a290004 type:ping cxid:0xfffffffffffffffe zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a
And when given client is not active anymore, we can see how Apache ZooKeeper proceeds to session destruction:
[myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x153c75895da0000, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:230) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203) at java.lang.Thread.run(Thread.java:745) [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed socket connection for client /127.0.0.1:37701 which had sessionid 0x153c75895da0000 [myid:] - INFO [SessionTracker:SessionTrackerImpl@198] - Session closing: 0x153c75895da0000 [myid:] - INFO [SessionTracker:ZooKeeperServer@355] - Expiring session 0x153c75895da0000, timeout of 5000ms exceeded [myid:] - INFO [ProcessThread(sid:0 cport:2181)::SessionTrackerImpl@198] - Session closing: 0x153c75895da0000 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@489] - Processed session termination for sessionid: 0x153c75895da0000 [myid:] - DEBUG [SyncThread:0:FinalRequestProcessor@88] - Processing request:: sessionid:0x153c75895da0000 type:closeSession cxid:0x0 zxid:0x65d txntype:-11 reqpath:n/a
When client disconnects, all its ephemeral zNodes are deleted. Watchers listening for events associated to them are notified about the deletion. All these events can be observed in logs under similar format to:
[myid:] - DEBUG [main:DataTree@969] - Deleting ephemeral node /tmp_ephemeral for session 0x153acbd7a9e004c
Apache ZooKeeper session example in Java API
To test session features, especially session recovery, we'll need to stop/start server manually when appropriated text is printed in the screen. Below, you can find some tests illustrating Apache ZooKeeper sessions:
@Test public void should_see_ephemeral_nodes_after_reconnecting() throws KeeperException, InterruptedException, IOException { long sessionId = zooKeeper.getSessionId(); byte[] sessionPassword = zooKeeper.getSessionPasswd(); String path = zooKeeper.create("/tmp_ephemeral", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); assertThat(zooKeeper.exists(path, false)).isNotNull(); System.out.println("Stop ZooKeeper now"); Thread.sleep(5000); System.out.println("Waiting for new connection....Start ZooKeeper now"); Thread.sleep(5000); ZooKeeper newZooKeeper = new ZooKeeper("127.0.0.1:2181", CONNECTION_TIMEOUT, (event) -> System.out.println("Processing event " + event), sessionId, sessionPassword); while (newZooKeeper.getState() != ZooKeeper.States.CONNECTED) { } assertThat(newZooKeeper.getSessionId()).isEqualTo(sessionId); assertThat(newZooKeeper.getSessionPasswd()).isEqualTo(sessionPassword); assertThat(newZooKeeper.getState().isConnected()).isTrue(); assertThat(newZooKeeper.getState().isAlive()).isTrue(); assertThat(newZooKeeper.exists("/sss", false)).isNull(); assertThat(newZooKeeper.exists(path, false)).isNotNull(); } @Test public void should_correctly_connect_to_zookeeper() throws IOException, InterruptedException { ZooKeeper zooKeeper = null; try { zooKeeper = new ZooKeeper("127.0.0.1:2181", CONNECTION_TIMEOUT, (event) -> System.out.println("Processing event " + event)); /** * We must wait some time because connection process is asynchronous, ie. ZooKeeper * instance is returned before connection is really done. */ while (zooKeeper.getState() == ZooKeeper.States.CONNECTING) { } assertThat(zooKeeper.getState().isConnected()).isTrue(); } finally { zooKeeper.close(); } } @Test public void should_fail_on_connecting_to_not_existent_server() throws IOException, InterruptedException { ZooKeeper zooKeeper = null; try { zooKeeper = new ZooKeeper("127.0.0.1:3210", CONNECTION_TIMEOUT, (event) -> System.out.println("Processing event " + event)); ZooKeeper.States state = zooKeeper.getState(); assertThat(state.isConnected()).isFalse(); } finally { zooKeeper.close(); } } @Test public void should_override_session_timeout_when_it_is_lower_than_tick_time() throws IOException, InterruptedException { ZooKeeper zooKeeper = null; try { zooKeeper = new ZooKeeper("127.0.0.1:2181", 100, (event) -> System.out.println("Processing event " + event)); while (zooKeeper.getState() == ZooKeeper.States.CONNECTING) { } System.out.println("Session timeout is "+zooKeeper.getSessionTimeout()); assertThat(zooKeeper.getSessionTimeout()).isNotEqualTo(100); } finally { zooKeeper.close(); } } @Test public void should_not_override_session_timeout_when_it_is_lower_than_tick_time() throws IOException, InterruptedException { ZooKeeper zooKeeper = null; try { // tick time is configured to 2000 ms, so 30000 ms of session timeout should be enough to make // ZooKeeper not overriding this value zooKeeper = new ZooKeeper("127.0.0.1:2181", 30000, (event) -> System.out.println("Processing event " + event)); while (zooKeeper.getState() == ZooKeeper.States.CONNECTING) { } System.out.println("Session timeout is "+zooKeeper.getSessionTimeout()); assertThat(zooKeeper.getSessionTimeout()).isEqualTo(30000); } finally { zooKeeper.close(); } }
Apache ZooKeeper allows also to connect to specific path. By doing so, all access to nodes are relative to specified path which is illustrated by below tests:
private static final String NODE_1 = "/node_1"; private static final String NODE_2 = "/node_2"; private static final String NODE_1_CHILD_1 = "/node_1/child_1"; private static final String NODE_1_CHILD_2 = "/node_1/child_2"; @BeforeClass public static void initContext() throws KeeperException, InterruptedException { zooKeeper.create(NODE_1, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zooKeeper.create(NODE_2, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zooKeeper.create(NODE_1_CHILD_1, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zooKeeper.create(NODE_1_CHILD_2, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } @AfterClass public static void clean() throws InterruptedException { // remove created nodes try { safeDelete(NODE_1_CHILD_1, NODE_1_CHILD_2, NODE_1, NODE_2); } finally { zooKeeper.close(); } } @Test public void should_connect_with_chroot_path() throws IOException, KeeperException, InterruptedException { /** * When a concrete path is specified at connection time, all further operations are relative to it. * It means that in our case we connect to /node_1 path. So accessing /child_1 looks for /node_1/child_1 * zNode (existent one) but /node_1/child_1 tries to reach /node_1/node_1/child_1 (not existent) */ ZooKeeper chrootKeeper = new ZooKeeper("127.0.0.1:2181"+NODE_1, CONNECTION_TIMEOUT, (event) -> System.out.println("Processing event " + event)); while (chrootKeeper.getState() == ZooKeeper.States.CONNECTING) { } assertThat(chrootKeeper.exists(NODE_1_CHILD_1, false)).isNull(); assertThat(chrootKeeper.exists("/child_1", false)).isNotNull(); }
The article presents main session information. In the first part we can learn that sessions represent state between client and server connection. The connection which is also represented by one of lifecycle states (CONNECTING, CONNECTED). The second part describes through some Java test cases previously presented features.