Controller in Apache Kafka

Controller is a well-known concept for the ones who have worked with MVC paradigm. But regarding to Kafka, it shouldn't be thought in the same categories.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in πŸ‘‰ Early Release on the O'Reilly platform

I also help solve your data engineering problems πŸ‘‰ contact@waitingforcode.com πŸ“©

At the beginning of this post we explain shortly the definition of Kafka controller. In the second, more verbose part, we play with it and check under which circumstances it's invoked.

What is Kafka controller ?

Apache Kafka controller is a broker having more responsabilities than the others. Its first and main responsibility concerns partitions management. It'll be involved every time when partitions state changes. And more exactly, in these actions:

Kafka controller examples

Let's check what happens in some of these cases. The tests are made locally on 2 running "brokers". But first, let's start only one broker. After that, let's start the second one. The first started broker should be the leader for the cluster and it should contain more explicit logs in logs/controller.log file. If we take a look on the content of this file, we'll see that the leader controller noticed the appearing of new Kafka broker in this way:

INFO [BrokerChangeListener on Controller 0]: Newly added brokers: 1, deleted brokers: , all live brokers: 0,1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
DEBUG [Channel manager on controller 0]: Controller 0 trying to connect to broker 1 (kafka.controller.ControllerChannelManager)
INFO [Controller 0]: New broker startup callback for 1 (kafka.controller.KafkaController)
INFO [Controller-0-to-broker-1-send-thread], Starting  (kafka.controller.RequestSendThread)
INFO [Controller-0-to-broker-1-send-thread], Controller 0 connected to Node(1, bartosz, 9093) for sending state change requests (kafka.controller.RequestSendThread)

As you can see, the leader (broker with the id=0) is connecting to new replica broker (id=1). Now, we can create a new topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic controllertest

To see what happened in the broker, we check leader controller's log once again:

DEBUG [TopicChangeListener on Controller 0]: Topic change listener fired for path /brokers/topics with children controllertest (kafka.controller.PartitionStateMachine$TopicChangeListener)
INFO [TopicChangeListener on Controller 0]: New topics: [Set(controllertest)], deleted topics: [Set()], new partition replica assignment [Map([controllertest,0] -> List(1, 0))] (kafka.controller.PartitionStateMachine$TopicChangeListener)
INFO [Controller 0]: New topic creation callback for [controllertest,0] (kafka.controller.KafkaController)
INFO [Controller 0]: New partition creation callback for [controllertest,0] (kafka.controller.KafkaController)
INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [controllertest,0] (kafka.controller.PartitionStateMachine)
INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=controllertest,Partition=0,Replica=1],[Topic=controllertest,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine)
INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [controllertest,0] (kafka.controller.PartitionStateMachine)
DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [controllertest,0] are: [List(1, 0)] (kafka.controller.PartitionStateMachine)
DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [controllertest,0] to (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:1) (kafka.controller.PartitionStateMachine)
INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=controllertest,Partition=0,Replica=1],[Topic=controllertest,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine)

As you can see, controller notices new topic creation and triggers partitions assignment. It also chosen leader and in-sync replicas (isr). Now, if we wait some time, we'll observe following actions in the logs:

TRACE [Controller 0]: checking need to trigger partition rebalance (kafka.controller.KafkaController)
DEBUG [Controller 0]: preferred replicas by broker Map(1 -> Map([controllertest,0] -> List(1, 0))) (kafka.controller.KafkaController)
DEBUG [Controller 0]: topics not in preferred replica Map() (kafka.controller.KafkaController)
TRACE [Controller 0]: leader imbalance ratio for broker 1 is 0.000000 (kafka.controller.KafkaController)
TRACE [Controller 0]: checking need to trigger partition rebalance (kafka.controller.KafkaController)
DEBUG [Controller 0]: preferred replicas by broker Map(1 -> Map([controllertest,0] -> List(1, 0))) (kafka.controller.KafkaController)
DEBUG [Controller 0]: topics not in preferred replica Map() (kafka.controller.KafkaController)
TRACE [Controller 0]: leader imbalance ratio for broker 1 is 0.000000 (kafka.controller.KafkaController)

Actually, there is no need to trigger rebalance. To give some work to controller, we can create new partition for previously initialized topic:

bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 3 --topic controllertest

Leader controller's logs should see this change:

INFO [AddPartitionsListener on 0]: Add Partition triggered {"version":1,"partitions":{"2":[1,0],"1":[0,1],"0":[1,0]}} for path /brokers/topics/
controllertest (kafka.controller.PartitionStateMachine$AddPartitionsListener) 
INFO [AddPartitionsListener on 0]: New partitions to be added Map([controllertest,2] -> List(1, 0), [controllertest,1] -> List(0, 1)) (kafka.co
ntroller.PartitionStateMachine$AddPartitionsListener) 
INFO [Controller 0]: New partition creation callback for [controllertest,2],[controllertest,1] (kafka.controller.KafkaController) 
INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [controllertest,2],[controllertest,1] (kaf
ka.controller.PartitionStateMachine) 
INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=controllertest,Partition=2,Replica=1],[To
pic=controllertest,Partition=2,Replica=0],[Topic=controllertest,Partition=1,Replica=0],[Topic=controllertest,Partition=1,Replica=1] (kafka.controller.ReplicaStateMachine
) 
INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [controllertest,2],[controllertest,1] (
kafka.controller.PartitionStateMachine) 
DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [controllertest,2] are: [List(1, 0)] (kafka.controller.Pa
rtitionStateMachine)
DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [controllertest,2] to (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:1) (kafka.controller.PartitionStateMachine)
DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [controllertest,1] are: [List(0, 1)] (kafka.controller.PartitionStateMachine)
DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [controllertest,1] to (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:1) (kafka.controller.PartitionStateMachine)
INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=controllertest,Partition=2,Replica=1],[Topic=controllertest,Partition=2,Replica=0],[Topic=controllertest,Partition=1,Replica=0],[Topic=controllertest,Partition=1,Replica=1] (kafka.controller.ReplicaStateMachine) 

So far we've only made "optimistic" operations - new elements were added to the brokers. It's the time to see controller's behavior when it goes down. To test that, we can simply Ctrl+X+C previously started broker. After that, quit until now logs of replica controller become more and more verbose. In fact, the initial controller triggers a request marking it as resigning for this role:

DEBUG [Controller 0]: Controller resigning, broker id 0 (kafka.controller.KafkaController)
DEBUG [Controller 0]: De-registering IsrChangeNotificationListener (kafka.controller.KafkaController)
[...]
INFO [Controller-0-to-broker-0-send-thread], Stopped  (kafka.controller.RequestSendThread)
INFO [Controller-0-to-broker-0-send-thread], Shutdown completed (kafka.controller.RequestSendThread)
INFO [Controller 0]: Broker 0 resigned as the controller (kafka.controller.KafkaController)

If we go to the logs of the second broker, we'll see that it became a controller:

INFO [Replica state machine on controller 1]: Started replica state machine with initial state -> Map([Topic=controllertest,Partition=0,Replica=1] -> OnlineReplica, [Topic=controllertest,Partition=2,Replica=1] -> OnlineReplica, [Topic=controllertest,Partition=0,Replica=0] -> ReplicaDeletionIneligible, [Topic=controllertest,Partition=1,Replica=0] -> ReplicaDeletionIneligible, [Topic=controllertest,Partition=2,Replica=0] -> ReplicaDeletionIneligible, [Topic=controllertest,Partition=1,Replica=1] -> OnlineReplica) (kafka.controller.ReplicaStateMachine)
INFO [Partition state machine on Controller 1]: Started partition state machine with initial state -> Map([controllertest,2] -> OnlinePartition, [controllertest,0] -> OnlinePartition, [controllertest,1] -> OnlinePartition) (kafka.controller.PartitionStateMachine)
INFO [Controller 1]: Broker 1 is ready to serve as the new controller with epoch 2 (kafka.controller.KafkaController)
INFO [Controller 1]: Starting preferred replica leader election for partitions  (kafka.controller.KafkaController)
INFO [Partition state machine on Controller 1]: Invoking state change to OnlinePartition for partitions  (kafka.controller.PartitionStateMachine)

The most important information is about "Broker 1 is ready to serve as the new controller (...)". We can also observe that the epoch changed from 1 to 2. Other information concerns partitions management.

This article shows controller part of Apache Kafka. This part is quite essential because it guarantees correct replication and high availability of service. Controller is a kind of coordinator between what happen in brokers. As we could see in the examples of the second part of this article, controllers triggers replication and even helps to elect new controller if the previous one goes down.


If you liked it, you should read:

πŸ“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!