Replication in Apache Kafka

Since Apache Kafka is distributed messaging system and we haven't described replication yet, it's a good moment to do so.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in πŸ‘‰ Early Release on the O'Reilly platform

I also help solve your data engineering problems πŸ‘‰ contact@waitingforcode.com πŸ“©

This article focuses on replication side of Apache Kafka. The first part describes what happens when new topic expecting replication is created. After, we explore the case when new message is written by producer. Next part explains why some replicas can be considered as lagged. The last one shows another concept related to replication, high watermark.

Partitions replication

When new topic is created on the broker, we can specify on how many replicas data should be stored. If it's not specified, the value defined in broker's configuration, default.replication.factor, is taken into account. After topic and partitions creation on initial broker, partitions replication is triggered on replicated brokers. Be careful, replication factor includes leader. So if you define it as 1, only leader will store data.

Replicas are created according to round-robin algorithm, exactly in the same way as partitions are assigned to consumers. It guarantees more equal replication among cluster nodes. Each replicated partition has one leader broker. And the election of leaders is also made in round-robin way.

Replication on writing

The role of leader consists not only to holding information about its replicas, but also on handling write and read requests. Basing on information available in ZooKeeper, message producer sends its messages to leader broker. The leader takes this message and appends it to logs stored on disk. Next, it replicates message(s) on followers. When all of them acknowledge the receive, the message is considered as committed. And this consideration is important from the consumer's point of view. It's because only committed messages visible to consumer.

Message replication process is supposed to be quick thanks to replicas behavior on handled message. In fact, message isn't persisted to logs on disk but saved in the memory. But in the other side, this strategy doesn't give the guarantee that once replicated, message can always be retrieved from replicates.

The replication can be made either in synchronous or in asynchronous way. Synchronous replication occurs when leader writes the message to all replicas immediately. And only when it receives the confirmation of message reception, it can tell to client that message was correctly added. For the asynchronous replication, leader confirms the write as soon as it succeeds to append entry to its log file.

The strategy adopted by Kafka is known under the name of primary-backup. It was chosen by Kafka because it tolerates more failures (n-1 where n is the number of replicas) and is able to work with only 2 replicas. In the other side, quorum approach expects the write be done on more than half of replicas. So if we have 12 replicas, quorum can tolerate only the failure of 5 of them. For primary-backup, only replicas synchronized (in-sync, detailed in the next part) are considered. As we can see further in the article, Kafka accepts even that only the leader (the famous n-1) be alive to successfully send new messages.

Lagged replicas

A new leader can be elected every time when current one fails. The new is elected among a set of in-sync replicas. The concept of in-sync introduces another idea about replication - lagging. A replica is considered as out-of-sync when its last fetch request or last logs consumption of leader logs exceeded the value specified in replica.lag.time.max.ms configuration entry.

The additional role of ISR is to ensure that leader won't block infinitely in the case of one replica's failure. In this situation, instead of waiting for replica's reply, the leader considers this node as failed and simply removes it from ISR set. By doing so, it can continue to send given message to the rest of replicas.

Does it mean that when one ISR fails, the leader choses new one ? Let's do some tests to see:

  1. Start 3 brokers
  2. Create one topic, with only 1 partition, and define replication factory to 2
  3. Check which brokers hold topic partition
  4. Verify which broker is a replica in ZooKeeper under /brokers/topics/replicationtopic/partitions/0/state path:
    {"controller_epoch":1,"leader":2,"version":1,"leader_epoch":0,"isr":[2,1]}
    
  5. Turn off replica
  6. Now, when you checking ISR state in ZooKeeper, you can see that there is only one ISR, corresponding to the leader:
    {"controller_epoch":1,"leader":2,"version":1,"leader_epoch":1,"isr":[2]}
    
    So, failing replicas don't trigger the creation of new replicas.

Other problem we could analyze is the case when one of replicas goes down and some times after, it goes up. To do the test, we still use the same configuration as in previous point. This time, we begin by making all replicas up and producing 100 message. Once done, we can turn off the replica node. The logs of leader should display something like that:

INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [replicationtopic,0] (kafka.server.ReplicaFetcherManager)

When replica is up again, it's resynchronized by first truncating its logs to the last checkpointed high watermark. After that, it starts to retrieve missing messages from the leader. Once it's done, the replica is available once again as in-sync one. These actions can be found in replica logs:

INFO Truncating log replicationtopic-0 to offset 100. (kafka.log.Log)
INFO [ReplicaFetcherThread-0-2], Starting  (kafka.server.ReplicaFetcherThread)
INFO [ReplicaFetcherManager on broker 1] Added fetcher for partitions List([[replicationtopic,0], initOffset 100 to broker BrokerEndPoint(2,bartosz,9094)] ) (kafka.server.ReplicaFetcherManager)

High watermark

In the previous part we're talking about high watermark. Each time when leader saves new data in one partition, it updated an information representing the last written offset. And this information is called high watermark. Further and periodically, it's communicated to the replicas.

It's the reason why in our last test, the replica truncated its logs to the first 100 ones (even if there no more than that in reality). This value corresponded to the value of HW. The messages up to HW were further retrieved by fetching them from the leader. After this operation, replica was synchronized again.

This post describes replication part of Apache Kafka. The first part presents how replicas are elected when new topics with replication factor superior to 1 is created. We can see that Kafka tolerates up to n-1 failures. That thanks to in-sync replicas idea, representing all nodes not so far away of leader. The third part shows the cases when replica is not following anymore the leader. In our case, it failed. The last and short part described the high watermark, ie. last commited offset.


If you liked it, you should read:

πŸ“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!