Global and local Apache ZooKeeper in Apache Pulsar - part 1

on waitingforcode.com

Global and local Apache ZooKeeper in Apache Pulsar - part 1

You're a data analyst who wants to start a new chapter in his professional life? I've just released a course that will help you to do that and become a data engineer. Join the class right now! Join the class!
Before really exploring Apache ZooKeeper in Apache Pulsar, I was thinking that 1 post will be enough. But in fact it won't be, unless it would be really long which often is hard to read and understand. That's why you can find here the first part of my exploration about ZooKeeper in Pulsar.

This blog post is composed of 3 sections. In the first one, I will focus on the ZooKeeper locality regarding Pulsar brokers. In the next one, I will check why we configure 2 localities in 3 different ways whereas finally, I will explore the configuration options of both ZooKeepers.

Local and global responsibilities

In the documentation section about Apache ZooKeeper in Apache Pulsar you can find the information about 2 different ZooKeepers. The first one is called local and it operates at the cluster level. The second one works across all clusters and is called configuration store. To discover what's going on, I analyzed bin/pulsar file and looked for "zookeeper" inside. I found 3 interesting snippets:

elif [ $COMMAND == "zookeeper" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"zookeeper.log"}
    exec $JAVA ${ZK_OPTS} $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ZooKeeperStarter $PULSAR_ZK_CONF $@
elif [ $COMMAND == "global-zookeeper" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"global-zookeeper.log"}
    # Allow global ZK to turn into read-only mode when it cannot reach the quorum
    OPTS="${OPTS} ${ZK_OPTS} -Dreadonlymode.enabled=true"
    exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_GLOBAL_ZK_CONF $@
elif [ $COMMAND == "configuration-store" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"configuration-store.log"}
    # Allow global ZK to turn into read-only mode when it cannot reach the quorum
    OPTS="${OPTS} ${ZK_OPTS} -Dreadonlymode.enabled=true"
    exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_CONFIGURATION_STORE_CONF $@

From that you can see 2 classes responsible for ZooKeeper management, respectively ZooKeeperStarter and ConfigurationStoreStarter. Well, 2 classes and 3 ZooKeeper-like commads? It sounds strange, so let's see what's the difference between "global-zookeeper" and "configuration-store" commands.

global-zookeeper vs configuration-store

Both commands use the same class but the configuration is different. global-zookeeper uses PULSAR_GLOBAL_ZK_CONF whereas configuration-store works on PULSAR_CONFIGURATION_STORE_CONF. How are they set? In bin/pulsar we can find the default configuration only for the latter entry:

DEFAULT_CONFIGURATION_STORE_CONF=$PULSAR_HOME/conf/global_zookeeper.conf

In fact, it's normal because DEFAULT_GLOBAL_ZK_CONF disappeared in 2018 in the quite self-explanatory commit "Hide deprecated options and few more adjustments". So no worries, global-zookeeper is the previous name for configuration-store, probably kept for retrocompatibility.

Configuration

Let's see know what are the differences between the configuration files of these ZooKeeper instances. In fact, they're very similar and by themselves, don't show what's are the differences between the stored data. In both zookeper.conf (local) and global_zookeeper.conf (configuration store) you will find:

  • tickTime, initLimit and syncLimit - the main time unit in ZooKeeper is called tick. The first parameter defines the value of 1 tick in milliseconds. The second, initLimit, defines how long follower ZooKeeper servers can try to synchronize with the leader ZooKeeper for the first connection. Finally, syncLimit, specifies the max synchronization time between a follower and the leader. In other words, it defines how long a synchronization request can remain unacknowledged. Both mean slightly the same thing except the fact they exist separetely because the amount of data to synchronize during the first connection can be much bigger than during the continuous synchronization.
  • dataDir - the directory where the snapshot of given ZooKeeper will be store. This property defines where ZooKeeper will write in persistent manner the copy if znodes stored in-memory. It can also store the transaction logs if dataLogDir property is not specified. I explained zNodes few years ago in zNode in Apache ZooKeeper post.
  • autopurge.snapRetainCount and autopurge.purgeInterval - define how many snapshots will be kept and also how often, in hours, too old snapshots will be removed. This feature was introduced in ZooKeeper 3.4.0.
  • clientPort - the port used by the client connections.
  • admin.serverPort and admin.enableServer - properties for ZooKeeper's admin server. Admin server is the server running commands. A command is always composed of 4 letters and they can be used to gather the information about the execution environment, like listing sessions or watches. I explained the latter ones in Watches in Apache ZooKeeper article.

The single difference is the presence of forceSync in the configuration for the local ZooKeeper. Accordingly to the documentation, it "Requires updates to be synced to media of the transaction log before finishing processing the update. If this option is set to no, ZooKeeper will not require updates to be synced to the media.". In simpler words it means that, if enabled (=yes), ZooKeeper will persist all pending logs to the file at every commit. Even more detailed answer in the code:

        for (FileOutputStream log : streamsToFlush) {
            log.flush();
            if (forceSync) {
                long startSyncNS = System.nanoTime();

                FileChannel channel = log.getChannel();
                channel.force(false);

                syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
                if (syncElapsedMS > fsyncWarningThresholdMS) {
                    if(serverStats != null) {
                        serverStats.incrementFsyncThresholdExceedCount();
                    }
                    LOG.warn("fsync-ing the write ahead log in "
                            + Thread.currentThread().getName()
                            + " took " + syncElapsedMS
                            + "ms which will adversely effect operation latency. "
                            + "File size is " + channel.size() + " bytes. "
                            + "See the ZooKeeper troubleshooting guide");
                }
            }
        }

The comment for log.flush indicates that:


* If the intended destination of this stream is an abstraction provided by
* the underlying operating system, for example a file, then flushing the
* stream guarantees only that bytes previously written to the stream are
* passed to the operating system for writing; it does not guarantee that
* they are actually written to a physical device such as a disk drive.

And if you compare it with the comment of channel.force(false), everything should be clear:


* Forces any updates to this channel's file to be written to the storage
* device that contains it.
*
* <p> If this channel's file resides on a local storage device then when
* this method returns it is guaranteed that all changes made to the file
* since this channel was created, or since this method was last invoked,
* will have been written to that device. This is useful for ensuring that
* critical information is not lost in the event of a system crash.
*
* <p> If the file does not reside on a local device then no such guarantee
* is made.

This value is by default set to true:

private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");

The change was introduced to accelerate integration tests and it has more a character of warning that highlights the fact that it should be manipulated only during the tests.

This first post from ZooKeeper in Pulsar series introduced the distinction between local ZooKeeper and global ZooKeeper called configuration store. It also explained a few subtleties, like the differences, or rather the lack of them, between global-zookeeper and configuration-store commands. In the last part, you could discover what configuration properties are defined in Pulsar's ZooKeeper defaults. In the next post from the series, I will focus more on the Apache ZooKeeper storage used in the context of Apache Pulsar. Thanks for reading!

Share on:

Share, like or comment this post on Twitter:

Share, like or comment this post on Facebook: