Coordinator in Apache Kafka

Since Kafka is a distributed system, it naturally has to coordinate its members somehow. The synchronization mechanism is ensured by coordinators.

This post describes different types of coordinators in Kafka clients. The first part describes consumer coordinator. The second part tells about broker coordinator.

Consumer coordinator

Consumer coordinator is related to consumer in client side. This role consists on communicating with Kafka broker and detect if there are some organization change, as new group composition or new partition to consume. It's also responsible to make the famous requests known as heartbeats, which goal is to make sure that given consumer is still alive.

Among different actions accomplished by consumer coordinator we can distinguish:

Offset management coordination actions can be triggered manually. But it's not always the case. For heartbeating, coordinator sends requests with specified delay. Regarding to the group management actions, they're mostly invoked when other actions are. For example, when given customer is unsubscribing partitions and topics, it sends, through its coordinator, the requests informing broker that it will quit its group.

As you can see, coordination is a process in relation with rebalancing. The goal of rebalancing is to ensure that all partitions of given topic are assigned to the maximum 1 consumer of given group. So heartbeats help here to see which consumers are still alive and, if they're some closed ones, rebalance partitions held by them to other group members.

Group coordinator

It was almost all for coordinator in client side. But if you look at code source, you'll find other coordinator type, server side coordinator. It's in direct relation with consumer coordinator. If you look to the previous part, Kafka broker mentioned there is in fact server-side coordinator. Each group has its own coordinator. This group coordinator is created every time when given Kafka server starts. There are only one group coordinator for each broker.

The responsibilities of group coordinator are the same as for consumer coordinator - including offset management. Thus, group coordinator can be thought as a "server" executing requests of consumer coordinator. The result of this execution is persisted in ZooKeeper.

Coordinator in action

To see better consumer coordinator workflow, let's write simple test with overridden log appender:

@Test
public void should_correctly_detect_coordinator_work() throws IOException {
  TestAppender testAppender = new TestAppender();
  String PATTERN = "%m";
  testAppender.setLayout(new PatternLayout(PATTERN));
  testAppender.setThreshold(Level.TRACE);
  testAppender.activateOptions();
  Logger.getRootLogger().addAppender(testAppender);

  String testName = "test1_";
  Properties consumerProps = Context.getInstance().getCommonProperties();
  consumerProps.setProperty("client.id", ConsumerHelper.generateName(TOPIC_NAME, testName));
  KafkaConsumer<String, String> localConsumer =
          new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, false, null));
  localConsumer.subscribe(Collections.singletonList(TOPIC_NAME));
  localConsumer.poll(500);

  assertThat(testAppender.getMessages()).hasSize(3);
   /**
    * Expected messages found in logs should be:
    * 1) Group metadata response ClientResponse(receivedTimeMs=1463484813276,
    *    disconnected=false, request=ClientRequest(expectResponse=true,
    *    callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@7a1ebcd8,
    *    request=RequestSend(header={api_key=10,api_version=0,correlation_id=2,client_id=c_basictopictesttest1_1463484812981},
    *    body={group_id=wfc_integration_test}), createdTimeMs=1463484813273, sendTimeMs=1463484813275),
    *
    *    responseBody={error_code=0,coordinator={node_id=0,host=bartosz,port=9092}}),
    *
    * 2) Issuing request (JOIN_GROUP: {group_id=wfc_integration_test,session_timeout=30000,
    *    member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,
    *    protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=26 cap=26]}]}) to coordinator 2147483647
    *
    * 3) Issuing leader SyncGroup (SYNC_GROUP: {group_id=wfc_integration_test,generation_id=1,
    *    member_id=c_basictopictesttest1_1463484812981-eed8d595-396c-4349-9272-c8ffd796c86e,
    *    group_assignment=[{member_id=c_basictopictesttest1_1463484812981-eed8d595-396c-4349-9272-c8ffd796c86e,
    *    member_assignment=java.nio.HeapByteBuffer[pos=0 lim=34 cap=34]}]}) to coordinator 2147483647]
    */
  assertMessageMatching(testAppender.getMessages(), "coordinator={node_id=0,");
  assertMessageMatching(testAppender.getMessages(), "Issuing request (JOIN_GROUP");
  assertMessageMatching(testAppender.getMessages(), "Issuing leader SyncGroup");
}

You can also see coordination tasks in logs. Below you can find logs where one consumer tries to join a group and make a pull() request:

Issuing group metadata request to broker 0 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:465)
Initiating connection to node 0 at bartosz:9092. (org.apache.kafka.clients.NetworkClient:487)
(...)
Group metadata response ClientResponse(receivedTimeMs=1463125112239, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@60c6f5b, request=RequestSend(header={api_key=10,api_version=0,correlation_id=2,client_id=c_basicproducertest8_1463125112102}, body={group_id=wfc_integration_test}), createdTimeMs=1463125112234, sendTimeMs=1463125112236), responseBody={error_code=0,coordinator={node_id=0,host=bartosz,port=9092}}) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:478)
Initiating connection to node 2147483647 at bartosz:9092. (org.apache.kafka.clients.NetworkClient:487)
Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:247)
(Re-)joining group wfc_integration_test (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:309)
Issuing request (JOIN_GROUP: {group_id=wfc_integration_test,session_timeout=30000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=25 cap=25]}]}) to coordinator 2147483647 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:318)
Added sensor with name node-2147483647.bytes-sent (org.apache.kafka.common.metrics.Metrics:201)
Added sensor with name node-2147483647.bytes-received (org.apache.kafka.common.metrics.Metrics:201)
Added sensor with name node-2147483647.latency (org.apache.kafka.common.metrics.Metrics:201)
Completed connection to node 2147483647 (org.apache.kafka.clients.NetworkClient:467)
Joined group: {error_code=0,generation_id=1,group_protocol=range,leader_id=c_basicproducertest8_1463125112102-cfc96b1d-bf10-42d9-891d-c05915967472,member_id=c_basicproducertest8_1463125112102-cfc96b1d-bf10-42d9-891d-c05915967472,members=[{member_id=c_basicproducertest8_1463125112102-cfc96b1d-bf10-42d9-891d-c05915967472,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=25 cap=25]}]} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:336)
DEBUG Performing range assignment for subscriptions {c_basicproducertest8_1463125112102-cfc96b1d-bf10-42d9-891d-c05915967472=org.apache.kafka.clients.consumer.internals.PartitionAssignor$Subscription@5abca1e0} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:225)
Finished assignment: {c_basicproducertest8_1463125112102-cfc96b1d-bf10-42d9-891d-c05915967472=org.apache.kafka.clients.consumer.internals.PartitionAssignor$Assignment@527740a2} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:229)
Issuing leader SyncGroup (SYNC_GROUP: {group_id=wfc_integration_test,generation_id=1,member_id=c_basicproducertest8_1463125112102-cfc96b1d-bf10-42d9-891d-c05915967472,group_assignment=[{member_id=c_basicproducertest8_1463125112102-cfc96b1d-bf10-42d9-891d-c05915967472,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=33 cap=33]}]}) to coordinator 2147483647 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:397)
Received successful sync group response for group wfc_integration_test: {error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=33 cap=33]} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:423)
Setting newly assigned partitions [basicproducer-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:191)
Fetching committed offsets for partitions: [basicproducer-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:581)
No committed offset for partition basicproducer-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:628)
Received successful heartbeat response. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:615)
(...)
Received successful heartbeat response. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:615)

In this post we can discover how consumer is aware about changes in its group thanks to consumer coordinator. We can see that it's notified about each change thanks to connection to group coordinator, located on one of Kafka brokers. Consumer coordinator sends different delayed requests to server to notify it about every state evolutions. Thanks to that group coordinator can do rebalancing correctly with other consumers existing in the group.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!