The role of Apache ZooKeeper in Apache Kafka

As already told during quick introduction, Apache ZooKeeper is an inseparable part of Apache Kafka. Knowing what happens between these two actors is important to start to work with Kafka correctly.

This article explains the role of Apache ZooKeeper (ZK) in Apache Kafka project. The first part describes the role of ZK in brokers and consumers management. The second one shows how and when Kafka interacts with ZK. The last part shows, through test cases, ZK reaction to some of Kafka events, such as topic subscription, consumer closing or message producing.

ZooKeeper won't be described here. If you're new to it, please go to the page with articles about Apache ZooKeeper where ZooKeeper is presented more in details.

ZooKeeper in Kafka

In the introduction to Apache Kafka we listed some of ZooKeeper use cases in Kafka. This time it's a good moment to describe them better. Generally, ZooKeeper stores a lot of shared information about consumers and brokers:

  1. Brokers

    • state - ZK determines if broker is alive always when it regularly sends heartbeats requests. In additional, when broker is constraint to handle replication, it must be able to follow replication needs, ie. not have an important replication debth.
    • quotas - Kafka allows some clients (identified by client.id property) to have different producing and consuming quotas. This value is set in ZK under /config/clients path. This change can be made in bin/kafka-configs.sh script.
    • replicas - ZK keeps a set of in-sync replicas (ISR) for each topic. This set is synchronized. It means that every time when one node fails, the ISR is updated. If the failing node was previously selected leader, ZK will, based on currently live nodes, elect new leader.
    • nodes and topics registry - ZK stores nodes and topic registries. We can find there all available brokers and, more precisely, which topics are held by each broker. They're stored under /brokers/ids and /brokers/topics zNodes. Nodes and topic registries are ephemeral nodes which means that they're alive only when given broker keeps a connection (session) open to ZK instance. All this information is destroyed once session closes.

      The register is made automatically by the broker when it's started.
  2. Consumers

    • offsets - in Kafka's 0.9.1 release, ZooKeeper is the default storage engine for consumer offsets. As announced in the documentation, the default storage mechanism will migrate further to special Kafka broker called Offset Manager. But by now, all information about how many messages were consumed by each consumer are stored in ZK.
    • registry - as in the case of brokers, consumers also have their own registry. And the same rules apply to it, ie. as ephemeral zNode, it's destroyed once consumer goes down and the register process is made automatically by consumer.
    • partitions registry - other registry related to consumers is about partitions ownership. Since each partition is consumed by exactly one consumer belonging to specific consumer group, Kafka must know the ownership relationship between partitions and consumers. And this information is stored in ZK under /consumers/${groupId}/owners/${topic} path as a zNode called by the pattern of ${brokerId}-${partitionId}. As previous consumer registry, this one is also ephemeral.

As you can see through that list, ZooKeeper is essentially employed to guarantee the synchronization between all participants in Kafka workflow - consumers and brokers.

How does Kafka talk to ZooKeeper ?

For this part we'll analyze Kafka classes responsible for working with ZooKeeper. Scala class representing Kafka server is KafkaServer. Its startup() method contains a call to method initializing ZooKeeper connection, called initZk(). The algorithm used in this method is composed by several steps. First, it checks if different chroot than "/" is configured. If it's the case, the method creates temporary connection to ZK. This session is responsible for creating zNodes corresponding to chroot if it's missing. Further this connection closes and the final connection held by server is created.

After, still inside initZk(), Kafka initializes all persistent zNodes used by server. We can retrieve there, among others: /consumers, /brokers/ids, /brokers/topics, /config, /admin/delete_topics, /brokers/seqid, /isr_change_notification, /config/topics, /config/clients.

This created ZooKeeper instance is used further to initialize other members using synchronization: replica manager, config manager, coordinator and controller.

Previously described consumer ephemeral nodes are created in ZookeeperConsumerConnector class. We can retrieve there consumers register to ZK and operations related to partitions assignement and consumed messages offset management.

Inspect Kafka's ZooKeeper through test cases

In our tests we'll need for some helper commands to create, delete or change tested topic. These useful commands are:

 
# to be executed from Kafka's dir
# creates topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_to_remove
# deletes topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181  --topic topic_to_remove
# adds one partition to created topic
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 2 --topic topic_to_remove

Below you can find some test cases showing which data and where is stored in ZooKeeper:

@Test
public void should_correctly_get_created_topic() throws InterruptedException, KeeperException, IOException {
  System.out.println("[i] Before launching the test, please create a topic called '"+TOPIC_NAME+"'");
  System.out.println("You can use this snippet: ");
  System.out.println("bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic "+TOPIC_NAME);
  Thread.sleep(10_000);

  ObjectMapper mapper = new ObjectMapper();
   /**
    * This query gets the information about topic called 'topic_to_remove'. Among stored informations we
    * should retrieve:
    * - topic version - version of given topic (increased for example when new partition is added)
    * - partitions assigned - specified in '--partitions' parameter in creation command
    *
    * JSON output can look like this:
    * {"version":1,"partitions":{"0":[0]}}
    */
  String topicInfo = new String(ZK_INSTANCE.getData("/brokers/topics/"+TOPIC_NAME, false, new Stat()));
  System.out.println(topicInfo);
  JsonTopic topicData = mapper.readValue(topicInfo, JsonTopic.class);
  assertThat(topicData.version).isEqualTo(1);
  assertThat(topicData.partitions).hasSize(1).containsKey("0");
  assertThat(topicData.partitions.get("0")).hasSize(1).containsOnly(0);

  /**
   * Now, let's inspect some partition information. It contains more details than topic one because
   * it's more concerned about reads and writes.
   * - controller_epoch - the epoch of controller; the controller itself
   *                      will be the topic of another post. Generally speaking
   *                      the 'epoch' is an incremental-only id changing every time
   *                      when given partition changes.
   * - leader - broker responsible for all reads and writes for given partition.
   * - version - version id
   * - leader_epoch - the epoch of leader.
   * - isr - the list of "in-sync" replicas. Here, since we've only 1 broker, we can find
   *         only the single living node
   * {"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}
   */
  String partitionInfo =
          new String(ZK_INSTANCE.getData("/brokers/topics/"+TOPIC_NAME+"/partitions/0/state", false, new Stat()));
  System.out.println(partitionInfo);
  JsonPartitionState partitionState = mapper.readValue(partitionInfo, JsonPartitionState.class);
  assertThat(partitionState.controllerEpoch).isEqualTo(1);
  assertThat(partitionState.leader).isEqualTo(0);
  assertThat(partitionState.version).isEqualTo(1);
  assertThat(partitionState.leaderEpoch).isEqualTo(0);
  assertThat(partitionState.isr).hasSize(1).containsOnly(0);

   /**
    * Now, let's increase the number of partitions to see if topic version
    * changes. It shouldn't change because of hard-coded '1' in AdminUtils.scala:
    * <pre>
    *   def getConfigChangeZnodeData(entityType: String, entityName: String) : Map[String, Any] = {
    *     Map("version" -> 1, "entity_type" -> entityType, "entity_name" -> entityName)
    *   }
    * </pre>
    *
    * The output should look like:
    * {"version":1,"partitions":{"1":[0],"0":[0]}}
    */
  System.out.println("[i] Please now add one partition to created topic, for example by using this query: ");
  System.out.println("bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 2 --topic "+TOPIC_NAME);
  Thread.sleep(10_000);
  String increasedTopicInfo = new String(ZK_INSTANCE.getData("/brokers/topics/"+TOPIC_NAME, false, new Stat()));
  System.out.println(increasedTopicInfo);
  JsonTopic increasedTopicData = mapper.readValue(increasedTopicInfo, JsonTopic.class);
  assertThat(increasedTopicData.version).isEqualTo(1);
  assertThat(increasedTopicData.partitions).hasSize(2).containsOnlyKeys("0", "1");
  assertThat(increasedTopicData.partitions.get("0")).hasSize(1).containsOnly(0);
  assertThat(increasedTopicData.partitions.get("1")).hasSize(1).containsOnly(0); 
}

@Test
public void should_correctly_get_consumer_information_when_two_consumers_want_to_consume_a_topic() throws IOException, InterruptedException, KeeperException {
   /**
    * Check if there are existent Offset Manager topic. If it exists, it can be removed through given steps:
    * 1) Delete /config/topics/__consumer_offsets
    * 2) Delete /brokers/topics/__consumer_offsets
    * 3) Delete local directory, for example:
    * rm -r /tmp/kafka-logs/__consumer_offsets-*
    * 4) Turn down Kafka
    * 5) Restart ZooKeeper
    * 6) Start Kafka
    * 7) Relaunch the test
    */
  try {
      new String(ZK_INSTANCE.getData("/brokers/topics/__consumer_offsets", false, new Stat()));
      fail("To see this test suceeds, you should first remove __consumer_offsets topic");
  } catch (KeeperException.NoNodeException kne) {
      // Do nothing
  }

   /**
    * Now when our test topic is created, let's create consumers of different groups subscribed
    * to it.
    * So first, let's create and subscribe consumers.
    */
  String group1 = "cons_group_1", group2 = "cons_group_2";
  String cons1Id = ConsumerHelper.generateName(TOPIC_NAME, "test1_"), cons2Id = ConsumerHelper.generateName(TOPIC_NAME, "test2_");
  Properties consumerProps = Context.getInstance().getCommonProperties();
  consumerProps.setProperty("group.id", group1);
  consumerProps.setProperty("client.id", cons1Id);
  KafkaConsumer<String, String> localConsumer =
          new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, false, null));
  Properties consumerProps2 = Context.getInstance().getCommonProperties();
  consumerProps2.setProperty("group.id", group2);
  consumerProps2.setProperty("client.id", cons2Id);
  KafkaConsumer<String, String> localConsumer2 =
          new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps2, false, null));

  TopicPartition partition = new TopicPartition(TOPIC_NAME, localConsumer.partitionsFor(TOPIC_NAME).get(0).partition());

  localConsumer.assign(Collections.singletonList(partition));
  localConsumer2.assign(Collections.singletonList(partition));

  // to create Offset Manager topic, we should want to get partition data, even if there are no
  // data currently
  localConsumer.poll(500);
  localConsumer2.poll(500);

   /**
    * Unlike low level consumers, consumers created directly from Java API use
    * Offset Manager to handle offsets. It's special topic called __consumer_offsets which should
    * appear after adding new consumers.
    * Sample response can be:
    * {"version":1,"partitions":{"45":[0],"34":[0],"12":[0],"8":[0],"19":[0],"23":[0],"4":[0],
    *              "40":[0],"15":[0],"11":[0],"9":[0],"44":[0],"33":[0],"22":[0],"26":[0],
    *              "37":[0],"13":[0],"46":[0],"24":[0],"35":[0],"16":[0],"5":[0],"10":[0],
    *              "48":[0],"21":[0],"43":[0],"32":[0],"49":[0],"6":[0],"36":[0],"1":[0],"
    *              39":[0],"17":[0],"25":[0],"14":[0],"47":[0],"31":[0],"42":[0],"0":[0],
    *              "20":[0],"27":[0],"2":[0],"38":[0],"18":[0],"30":[0],"7":[0],
    *              "29":[0],"41":[0],"3":[0],"28":[0]}}
    */
  ObjectMapper mapper = new ObjectMapper();
  String consumer1Info = new String(ZK_INSTANCE.getData("/brokers/topics/__consumer_offsets", false, new Stat()));
  JsonTopic increasedTopicData = mapper.readValue(consumer1Info, JsonTopic.class);
  assertThat(increasedTopicData.version).isEqualTo(1);
  assertThat(increasedTopicData.partitions).isNotEmpty();
}

As we can see in this article, Kafka really needs ZooKeeper to work efficiently in the cluster. All broker configuration is stored in ZooKeeper zNodes. And even the consumers need ZK to know which is the last consumed message.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!