Global and local Apache ZooKeeper in Apache Pulsar - part 2

Versions: Apache Pulsar 2.5.0

In my last post about Apache pulsar, I introduced global and local ZooKeepers. In this one, which is the follow-up, I'll check what both of them contain.

Looking for a better data engineering position and skills?

You have been working as a data engineer but feel stuck? You don't have any new challenges and are still writing the same jobs all over again? You have now different options. You can try to look for a new job, now or later, or learn from the others! "Become a Better Data Engineer" initiative is one of these places where you can find online learning resources where the theory meets the practice. They will help you prepare maybe for the next job, or at least, improve your current skillset without looking for something else.

👉 I'm interested in improving my data engineering skillset

See you there, Bartosz

In the first section I will analyze the content of my ZooKeeper after executing different administration requests. In the second part, I will analyze how Apache Pulsar interacts with its ZooKeeper clusters.

ZooKeeper content

Apache ZooKeeper stores the information about different things, like ledgers, bookies, namespaces or even schemas. Let's analyze every zNode more in detail. But before, I will start my Pulsar docker-compose and execute the following code (with Java SDK, even though can also be done in CLI but I want to learn the SDK before 🤓 ):

  val admin = PulsarAdmin.builder()
    .serviceHttpUrl("http://localhost:8080")
    .build()

  val nonPartitionedTopicName = "wfc-non-partitioned-topic-1"

  admin.topics().createNonPartitionedTopic(nonPartitionedTopicName)
  admin.topics().createSubscription(nonPartitionedTopicName, s"${nonPartitionedTopicName}-subscription",
    MessageId.earliest)

  val partitionedTopicName = "wfc-partitioned-topic-1"
  admin.topics().createPartitionedTopic(partitionedTopicName, 5)
  admin.topics().createSubscription(partitionedTopicName, s"${partitionedTopicName}-subscription",
    MessageId.earliest)


  val adminRoles = Sets.newHashSet("role1", "role2")

  val tenantInfo = new TenantInfo(adminRoles, Sets.newHashSet(admin.clusters().getClusters))
  admin.tenants().createTenant("wfc-tenant", tenantInfo)

  admin.namespaces().createNamespace("wfc-tenant/wfc-namespace")

Let's see now what zNodes exist after executing this code:

To retrieve the zNodes stores in the global ZooKeeper (configuration store), we can analyze the PulsarClusterMetadataSetup class. You can see there that the zNodes like /admin, /admin/policies or /admin/clusters are created on the global ZooKeeper. Other ones, so /namespace, /managed-ledgers, /ledgers or /bookies, so globally everything scoped to one Apache Pulsar cluster, are on the local ZooKeeper instance.

ZooKeeper management

As you saw, I executed some operations from the SDK and my ZooKeeper was updated by Apache Pulsar. The SDK, and more generally the web service, is the first way to interact with ZooKeeper server from Pulsar. Under-the-hood, the SDK's I used in the post sends the request to the API endpoint. This point is also valid for the read operations.

Another place creating zNodes are bootstrap classes that you call to initialize cluster metadata (initialize-cluster-metadata). By the cluster metadata I mean all information that is required by Apache Pulsar to be available for serving client requests, like the ones from my code snippet. The list of created zNodes is defined in org.apache.pulsar.PulsarClusterMetadataSetup class.

Regarding the reading part, how Apache Pulsar is aware of the changes made on ZooKeeper? Thanks to a ZooKeeper feature called watchers, Pulsar is notified whenever a zNode is modified. When it happens, Pulsar updates the entry in its in-memory ZooKeeper cache service. From that, you can deduce also that from time to time, client's requests won't reach the ZooKeeper server and very often in the code, you can see the calls involving only the zNode's cache, like here:

// NamespaceService
pulsar.getLocalZkCache().getDataAsync(path, ...)
Set activeNativeBrokers = pulsar.getLocalZkCache().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT)

// PulsarService
if (!this.globalZkCache.exists(
                    AdminResource.path(POLICIES) + "/" + nsName)) {
                LOG.info("SLA Namespace = {} doesn't exist.", nsName);
                return;
            }

// SimpleLoadManagerImpl
           if (pulsar.getLocalZkCache().exists(zkPath)) {
                pulsar.getZkClient().setData(zkPath, settingBytes, -1);

Even though Apache ZooKeeper is another extra layer in Apache Pulsar architecture, you shouldn't be scared. As you can see in this and previous article, it's mostly about storing metadata that is used by other components of the system, like subscription, load balancing, or namespaces. Maybe in the future, I will go back to this ZooKeeper topic but before, I need to learn more about these components.


If you liked it, you should read:

đź“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!