Global and local Apache ZooKeeper in Apache Pulsar - part 1

Versions: Apache Pulsar 2.5.0

Before really exploring Apache ZooKeeper in Apache Pulsar, I was thinking that 1 post will be enough. But in fact it won't be, unless it would be really long which often is hard to read and understand. That's why you can find here the first part of my exploration about ZooKeeper in Pulsar.

New ebook 🔥

Learn 84 ways to solve common data engineering problems with cloud services.

👉 I want my Early Access edition

This blog post is composed of 3 sections. In the first one, I will focus on the ZooKeeper locality regarding Pulsar brokers. In the next one, I will check why we configure 2 localities in 3 different ways whereas finally, I will explore the configuration options of both ZooKeepers.

Local and global responsibilities

In the documentation section about Apache ZooKeeper in Apache Pulsar you can find the information about 2 different ZooKeepers. The first one is called local and it operates at the cluster level. The second one works across all clusters and is called configuration store. To discover what's going on, I analyzed bin/pulsar file and looked for "zookeeper" inside. I found 3 interesting snippets:

elif [ $COMMAND == "zookeeper" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"zookeeper.log"}
    exec $JAVA ${ZK_OPTS} $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ZooKeeperStarter $PULSAR_ZK_CONF $@
elif [ $COMMAND == "global-zookeeper" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"global-zookeeper.log"}
    # Allow global ZK to turn into read-only mode when it cannot reach the quorum
    OPTS="${OPTS} ${ZK_OPTS} -Dreadonlymode.enabled=true"
    exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_GLOBAL_ZK_CONF $@
elif [ $COMMAND == "configuration-store" ]; then
    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"configuration-store.log"}
    # Allow global ZK to turn into read-only mode when it cannot reach the quorum
    OPTS="${OPTS} ${ZK_OPTS} -Dreadonlymode.enabled=true"
    exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_CONFIGURATION_STORE_CONF $@

From that you can see 2 classes responsible for ZooKeeper management, respectively ZooKeeperStarter and ConfigurationStoreStarter. Well, 2 classes and 3 ZooKeeper-like commads? It sounds strange, so let's see what's the difference between "global-zookeeper" and "configuration-store" commands.

global-zookeeper vs configuration-store

Both commands use the same class but the configuration is different. global-zookeeper uses PULSAR_GLOBAL_ZK_CONF whereas configuration-store works on PULSAR_CONFIGURATION_STORE_CONF. How are they set? In bin/pulsar we can find the default configuration only for the latter entry:

DEFAULT_CONFIGURATION_STORE_CONF=$PULSAR_HOME/conf/global_zookeeper.conf

In fact, it's normal because DEFAULT_GLOBAL_ZK_CONF disappeared in 2018 in the quite self-explanatory commit "Hide deprecated options and few more adjustments". So no worries, global-zookeeper is the previous name for configuration-store, probably kept for retrocompatibility.

Configuration

Let's see know what are the differences between the configuration files of these ZooKeeper instances. In fact, they're very similar and by themselves, don't show what's are the differences between the stored data. In both zookeper.conf (local) and global_zookeeper.conf (configuration store) you will find:

The single difference is the presence of forceSync in the configuration for the local ZooKeeper. Accordingly to the documentation, it "Requires updates to be synced to media of the transaction log before finishing processing the update. If this option is set to no, ZooKeeper will not require updates to be synced to the media.". In simpler words it means that, if enabled (=yes), ZooKeeper will persist all pending logs to the file at every commit. Even more detailed answer in the code:

        for (FileOutputStream log : streamsToFlush) {
            log.flush();
            if (forceSync) {
                long startSyncNS = System.nanoTime();

                FileChannel channel = log.getChannel();
                channel.force(false);

                syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
                if (syncElapsedMS > fsyncWarningThresholdMS) {
                    if(serverStats != null) {
                        serverStats.incrementFsyncThresholdExceedCount();
                    }
                    LOG.warn("fsync-ing the write ahead log in "
                            + Thread.currentThread().getName()
                            + " took " + syncElapsedMS
                            + "ms which will adversely effect operation latency. "
                            + "File size is " + channel.size() + " bytes. "
                            + "See the ZooKeeper troubleshooting guide");
                }
            }
        }

The comment for log.flush indicates that:


* If the intended destination of this stream is an abstraction provided by
* the underlying operating system, for example a file, then flushing the
* stream guarantees only that bytes previously written to the stream are
* passed to the operating system for writing; it does not guarantee that
* they are actually written to a physical device such as a disk drive.

And if you compare it with the comment of channel.force(false), everything should be clear:


* Forces any updates to this channel's file to be written to the storage
* device that contains it.
*
* <p> If this channel's file resides on a local storage device then when
* this method returns it is guaranteed that all changes made to the file
* since this channel was created, or since this method was last invoked,
* will have been written to that device. This is useful for ensuring that
* critical information is not lost in the event of a system crash.
*
* <p> If the file does not reside on a local device then no such guarantee
* is made.

This value is by default set to true:

private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");

The change was introduced to accelerate integration tests and it has more a character of warning that highlights the fact that it should be manipulated only during the tests.

This first post from ZooKeeper in Pulsar series introduced the distinction between local ZooKeeper and global ZooKeeper called configuration store. It also explained a few subtleties, like the differences, or rather the lack of them, between global-zookeeper and configuration-store commands. In the last part, you could discover what configuration properties are defined in Pulsar's ZooKeeper defaults. In the next post from the series, I will focus more on the Apache ZooKeeper storage used in the context of Apache Pulsar. Thanks for reading!