Apache Pulsar - global architecture and local setup

Versions: Apache Pulsar 2.5.0 https://github.com/bartosz25/pulsar...m/waitingforcode/TestProducer.scala

I'm really happy to start a whole new chapter on the blog and include Apache Pulsar to my monitored topics! Even though I already wrote about this technology in December 2019, I still feel hungry because the topic was more an analysis of Apache Spark Structured Streaming connector than the analysis of the tool per se. That's why I'm starting right now with the presentation of the basic concepts, like global architecture and local setup, both needed to go further.

A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free

👉 Register here

The post is divided into 2 parts. The former one presents the architecture from a big picture since it's not yet the moment to deep delve into it. The second part shows how to set up a test environment locally and what is the ecosystem of Apache Pulsar (SDKs, APIs, connectors, ...).

Architecture from a bird's eye view

At first glance, Apache Pulsar architecture is complex, especially if like me, you didn't meet Apache BookKeeper before. That's why below you can find a (very) simplified version that should be enough to start to familiarize yourself with this messaging technology:

Let's start with the simplest parts, the clients. The client communicates with the broker which in Pulsar can be thought as a proxy between the outside and inside world. An important point to notice at this occasion is that the broker doesn't store the data. It's then stateless, so easily scalable.

To see what happens later, let's take an example of a read request. First, the broker checks whether the data is located in a cache. If yes, the broker returns it to the client. Otherwise, it looks for the entry in a bookie which is the storage server coming from the Apache BookKeeper project. At this moment it's also important to introduce a concept that is not illustrated in my simplified schema, called managed ledger. It's an abstraction for a single topic composed of a single writer, multiple readers (each with independent position) and multiple BookKeeper ledgers used to store the data.

In the schema you can also find an Apache ZooKeeper which is used as the metadata store by brokers and BookKeeper. I will focus on what exactly is stored there in one of the next blog posts. If you're wondering why I didn't talk here about the topics, subscriptions and any other aspects related to the messaging, I did it on purpose to not introduce too many concepts at once. I will of course introduce them a little bit later in my learning process.

Local setup

This part of the local setup took me the most time. But don't get me wrong, Apache Pulsar comes with a pretty convenient way to work in standalone mode with bin/pulsar standalone command and it worked perfectly. But since I wanted to have a "real" distributed environment on my localhost, I tried to create a docker-compose environment. And it was harder than I expected, but not because of Pulsar.

The trickiest part was about synchronizing all the services because ZooKeeper is used by BookKeeper, BookKeeper by Pulsar, and meantime they have to be initialized properly. I ended up with a pretty messy solution which uses restart: on-failure feature of Docker Compose to restart depending services if their dependencies didn't terminate their setup. Why didn't I use the depends_on attribute? Unfortunately, it isn't based on the container readiness status, it only checks whether the container is running. Thanks to the restart on-failure, supposing that a service fails if another service is not ready, the depending services can have a chance to succeed, somewhere in the future (unless the dependencies never become ready).

But let's move to Apache Pulsar since I also had to discover a few features to use it correctly. The first of them is initialize-cluster-metadata. This command initializes cluster metadata in ZooKeeper. My mistake here was using the full path to my ZooKeeper instance, so the host and the port. It caused the errors like:

pulsar_init_1  | 10:13:48.049 [main-SendThread(zookeeper:1281)] INFO  org.apache.zookeeper.ClientCnxn - Opening socket connection to server zookeeper/172.26.0.2:1281. Will not attempt to authenticate using SASL (unknown error)
pulsar_init_1  | 10:13:48.050 [main-SendThread(zookeeper:1281)] INFO  org.apache.zookeeper.ClientCnxn - Socket error occurred: zookeeper/172.26.0.2:1281: Connection refused

Another error, that initially I thought to be related to this init step, was:

[2020-04-07 14:05:35,628] org.apache.pulsar.client.impl.ConnectionHandler WARN [my-topic] [null] Error connecting to broker: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.CompletionException: java.net.UnknownHostException: failed to resolve 'pulsarlocal' after 3 queries  (org.apache.pulsar.client.impl.ConnectionHandler:77)

I was quite surprised when I discovered that it fails only to the client (consumer & producer) part and not for the admin that I used to create one topic. After investigating, I remembered an option that I saw when I analyzed the configuration properties of Pulsar, the advertisedAddress. According to the documentation, it's a hostname or IP address the service advertises to the outside world. and the 'pulsarlocal' was the hostname I gave to my service in docker-compose. My first idea to make it work was to add a new entry to /etc/hosts with the container's IP and host. However, I finally opted for an easier solution with advertisedAddress set to localhost :-)

The last major problem I faced was about bookies. By default the number of copies for each message and the number of bookies used during ledger creation is equal to 2.To make it work with my simplistic, single-node BookKeeper cluster, I had to set the following properties to 1: managedLedgerDefaultAckQuorum, managedLedgerDefaultEnsembleSize, managedLedgerDefaultWriteQuorum, managedLedgerDefaultAckQuorum. I also modified similar properties on the BookKeeper side, namely these 3:

    environment:
# ...
      - BK_dlog.bkcWriteQuorumSize=1
      - BK_dlog.bkcAckQuorumSize=1
      - BK_dlog.bkcEnsembleSize=1

After fixing these 3 major issues I was able to run the following code against my composed cluster:


  val admin = PulsarAdmin.builder()
    .serviceHttpUrl("http://localhost:8080")
    .tlsTrustCertsFilePath(null)
    .allowTlsInsecureConnection(false)
    .enableTlsHostnameVerification(false)
    .build()

  admin.topics().createNonPartitionedTopic("my-topic")

  val client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .build()

  new Thread(new Runnable() {
    override def run(): Unit = {
      val producer = client.newProducer()
        .topic("my-topic")
        .create()
      var nr = 0
      while (true) {
        producer.send(s"My message ${nr}".getBytes())
        println("Sending new message")
        nr += 1
        Thread.sleep(2000)
      }
    }
  }).start()

  val consumer = client.newConsumer()
    .top*ic("my-topic")
    .subscriptionName("my-topic-subscription")
    .consumerName("test-consumer")
    .subscribe()

  while (true) {
    val messages = consumer.batchReceive() //.asScala.toSeq
    messages.forEach((message: Message[Array[Byte]]) => {
      println(s"Got message=${new String(message.getData)}")
    })
    consumer.acknowledge(messages)
  }

Below you can find a very short demo asserting that:

That's the first blog post about my Apache Pulsar exploration. To be honest, I'm still working on "intuition" mode. For example, I didn't see any BookKeeper address configuration in the Pulsar configuration file and for now I suppose that it's resolved from ZooKeeper. I didn't cover the part between the cache and the broker but it seems very important. Nonetheless, I'm still curious about these and other aspects, and will try to explore them in the next posts.