Global and local Apache ZooKeeper in Apache Pulsar - part 2

In my last post about Apache pulsar, I introduced global and local ZooKeepers. In this one, which is the follow-up, I'll check what both of them contain.

4-day workshop · In-person or online

What would it take for you to trust your Databricks pipelines in production?

A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.

Unit, data & integration tests
Medallion architecture & Lakeflow SDP
Max 10 participants · production-ready templates
See the full curriculum → €7,000 flat fee · cohort of up to 10
Bartosz Konieczny
Bartosz
Konieczny

In the first section I will analyze the content of my ZooKeeper after executing different administration requests. In the second part, I will analyze how Apache Pulsar interacts with its ZooKeeper clusters.

ZooKeeper content

Apache ZooKeeper stores the information about different things, like ledgers, bookies, namespaces or even schemas. Let's analyze every zNode more in detail. But before, I will start my Pulsar docker-compose and execute the following code (with Java SDK, even though can also be done in CLI but I want to learn the SDK before 🤓 ):

  val admin = PulsarAdmin.builder()
    .serviceHttpUrl("http://localhost:8080")
    .build()

  val nonPartitionedTopicName = "wfc-non-partitioned-topic-1"

  admin.topics().createNonPartitionedTopic(nonPartitionedTopicName)
  admin.topics().createSubscription(nonPartitionedTopicName, s"${nonPartitionedTopicName}-subscription",
    MessageId.earliest)

  val partitionedTopicName = "wfc-partitioned-topic-1"
  admin.topics().createPartitionedTopic(partitionedTopicName, 5)
  admin.topics().createSubscription(partitionedTopicName, s"${partitionedTopicName}-subscription",
    MessageId.earliest)


  val adminRoles = Sets.newHashSet("role1", "role2")

  val tenantInfo = new TenantInfo(adminRoles, Sets.newHashSet(admin.clusters().getClusters))
  admin.tenants().createTenant("wfc-tenant", tenantInfo)

  admin.namespaces().createNamespace("wfc-tenant/wfc-namespace")

Let's see now what zNodes exist after executing this code:

To retrieve the zNodes stores in the global ZooKeeper (configuration store), we can analyze the PulsarClusterMetadataSetup class. You can see there that the zNodes like /admin, /admin/policies or /admin/clusters are created on the global ZooKeeper. Other ones, so /namespace, /managed-ledgers, /ledgers or /bookies, so globally everything scoped to one Apache Pulsar cluster, are on the local ZooKeeper instance.

ZooKeeper management

As you saw, I executed some operations from the SDK and my ZooKeeper was updated by Apache Pulsar. The SDK, and more generally the web service, is the first way to interact with ZooKeeper server from Pulsar. Under-the-hood, the SDK's I used in the post sends the request to the API endpoint. This point is also valid for the read operations.

Another place creating zNodes are bootstrap classes that you call to initialize cluster metadata (initialize-cluster-metadata). By the cluster metadata I mean all information that is required by Apache Pulsar to be available for serving client requests, like the ones from my code snippet. The list of created zNodes is defined in org.apache.pulsar.PulsarClusterMetadataSetup class.

Regarding the reading part, how Apache Pulsar is aware of the changes made on ZooKeeper? Thanks to a ZooKeeper feature called watchers, Pulsar is notified whenever a zNode is modified. When it happens, Pulsar updates the entry in its in-memory ZooKeeper cache service. From that, you can deduce also that from time to time, client's requests won't reach the ZooKeeper server and very often in the code, you can see the calls involving only the zNode's cache, like here:

// NamespaceService
pulsar.getLocalZkCache().getDataAsync(path, ...)
Set activeNativeBrokers = pulsar.getLocalZkCache().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT)

// PulsarService
if (!this.globalZkCache.exists(
                    AdminResource.path(POLICIES) + "/" + nsName)) {
                LOG.info("SLA Namespace = {} doesn't exist.", nsName);
                return;
            }

// SimpleLoadManagerImpl
           if (pulsar.getLocalZkCache().exists(zkPath)) {
                pulsar.getZkClient().setData(zkPath, settingBytes, -1);

Even though Apache ZooKeeper is another extra layer in Apache Pulsar architecture, you shouldn't be scared. As you can see in this and previous article, it's mostly about storing metadata that is used by other components of the system, like subscription, load balancing, or namespaces. Maybe in the future, I will go back to this ZooKeeper topic but before, I need to learn more about these components.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems contact@waitingforcode.com đź“©