Rebalancing in Apache Kafka

on waitingforcode.com

Rebalancing in Apache Kafka

If you're working with Katka, rebalancing is maybe a word the most commonly met. It's also an important word because it helps to ensure correct message consumption.

As you can easily deduce, this post shows some points about rebalancing. The first part describes theory hidden behind it. The second part contains some code examples showing rebalancing in actions, depending on consumers behavior.

What is rebalancing in Kafka ?

As the Kafka's documentation tells, the goal of rebalancing is to ensure that all partitions are equally consumed. Equally means here that there is only one consumer linked to one partition. It's important to stress that the rebalancing applies only to consumers belonging to the same group. Thanks to that, Kafka clients can easily handle two messaging approaches: queue (several consumers per group) and publish-subscribe (1 consumer per group).

Rebalancing is done by a special broker called coordinator. Each consumers group has its own coordinator. One broker can coordinate several different groups. It occurs when some changes are detected concerning topics organization (new partitions added) or consumers (consumers joining or leaving group).

Partitioner's role

There is another aspect related to rebalancing - partitioning. When rebalancing is triggered, Kafka needs to determine which partitions will be consumed by which consumers. It can be done thanks to special Kafka represented by the implementations of PartitionAssignor interface. In its 0.9.1 release, 2 partitioners exist:

  • round-robin - assigns each next partition to new consumer. For example, suppose we have 2 topics (T1, T2), each one with 3 partitions (P1, P2, P3), and 2 consumers (C1 and C2). On the first round, the partitioner assigns T1-P1 to C1, T1-P2 to C2. After, it works with remaining partitions exactly in the same way, ie. assigns T1-P3 to C1, T2-P1 to C2 and so on.
  • range - compared to round-robin, range assignment is less equal. The algorithm computes the number of partitions for each topic. After it divides this number by the number of available consumers inside one group. The result of this division represents the number of partitions assigned to each consumer, rounded up. So it can leads to inequal balancing. Let's take the same case as previously. By doing the computation, C1 will have 2 partitions, C2 only 1. Since we have 2 topics, the C1 will have 4 partitions to consume while the C2 only 2.

In Java API code, partitions assignment is done by consumer coordinator, presented in the article about coordinators in Apache Kafka. You can observe the assignment process by looking for "Finished assignment" sentences in logs. More precisely, this process is handled by performAssignment(String, String, Map<String, ByteBuffer>) method. Inside it, PartitionAssignor performs partitions assignment by calling its own assign(Cluster, Map<String, Subscription>) method.

Used consumer partitioner is specified in partition.assignment.strategy property.

In the other side we can find producer's partitioner. Unlike consumer's ones, it doesn't use round-robin or range technique. Instead, it detects to which partition send given message regarding to hash of message's key.

Kafka rebalancing examples

Now, when we know a little about rebalancing, it's a good moment to write some code and test under which circumstances, this actions happens:

@Test
public void should_correctly_assign_partitions_to_consumer_on_range_fashion() throws IOException, InterruptedException {
  String testName = "test1_";
  printAlert();

  Properties producerProps = Context.getInstance().getCommonProperties();
  producerProps.setProperty("client.id", ProducerHelper.generateName(testName+"_producer", testName));
  // Send some messages to consume
  sendMessages(producerProps);

  Properties consumerProps = getCommonConsumerProperties("c1", RangeAssignor.class);
  KafkaConsumer<String, String> localConsumer =
          new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, true, null));
  Properties consumer2Props = getCommonConsumerProperties("c2", RangeAssignor.class);
  KafkaConsumer<String, String> otherConsumer =
          new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumer2Props, true, null));;

  // Subscribe and poll from the 1st consumer to guarantee the coordinator creation
  // and metadata request sent
  Map<Long, Collection<TopicPartition>> partitionsInTimeC1 = new TreeMap<>();
  Map<Long, Collection<TopicPartition>> partitionsInTimeC2 = new TreeMap<>();
  new Thread( () -> {
      localConsumer.subscribe(Lists.newArrayList(TOPIC_1, TOPIC_2), new TimeSensibleRebalanceListener(partitionsInTimeC1));
      localConsumer.poll(6_000);
  }).start();
  new Thread( () -> {
      otherConsumer.subscribe(Lists.newArrayList(TOPIC_1, TOPIC_2), new TimeSensibleRebalanceListener(partitionsInTimeC2));
      otherConsumer.poll(6_000);
  }).start();
  // Give some time to consumers to rebalance
  Thread.sleep(60_000);

  // First assumption, since it's range, we expect to have not equal distribution (4 - 2)
  // We expect also that only 1 rebalancing was made, since there are no more events related to group
  // composition
  List<Collection<TopicPartition>> notEmptyPartitionsC1FromStream = partitionsInTimeC1.values().stream()
          .filter(partitions -> !partitions.isEmpty()).collect(Collectors.toList());
  assertThat(notEmptyPartitionsC1FromStream).hasSize(1);
  Collection<TopicPartition> partitionsC1 = notEmptyPartitionsC1FromStream.get(0);
  assertThat(isAsBiggerAs(partitionsC1.size(), 2, 4)).isTrue();
  // Do the same checks for C2
  List<Collection<TopicPartition>> notEmptyPartitionsC2FromStream = partitionsInTimeC2.values().stream()
          .filter(partitions -> !partitions.isEmpty()).collect(Collectors.toList());
  assertThat(notEmptyPartitionsC2FromStream).hasSize(1);
  Collection<TopicPartition> partitionsC2 = notEmptyPartitionsC2FromStream.get(0);
  int previousPartitionsSize = partitionsC1.size();
  assertThat(partitionsC2).hasSize(previousPartitionsSize == 4 ? 2 : 4);
  // Check if there are no partitions consumed twice
  List<String> partitionNames1 = partitionsC1.stream().map(p -> p.topic()+"_"+p.partition()).collect(Collectors.toList());
  List<String> partitionNames2 = partitionsC2.stream().map(p -> p.topic()+"_"+p.partition()).collect(Collectors.toList());
  assertThat(partitionNames1).doesNotContainAnyElementsOf(partitionNames2);
}

@Test
public void should_correctly_assign_partitions_to_consumer_on_round_roubin_fashion() throws IOException, InterruptedException {
  String testName = "test2_";
  printAlert();

  Properties producerProps = Context.getInstance().getCommonProperties();
  producerProps.setProperty("client.id", ProducerHelper.generateName(testName+"_producer", testName));
  // Send some messages to consume
  sendMessages(producerProps);

  Properties consumerProps = getCommonConsumerProperties("c1", RoundRobinAssignor.class);
  KafkaConsumer<String, String> localConsumer =
          new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, true, null));
  Properties consumer2Props = getCommonConsumerProperties("c2", RoundRobinAssignor.class);
  KafkaConsumer<String, String> otherConsumer =
          new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumer2Props, true, null));;

  // Subscribe and poll from the 1st consumer to guarantee the coordinator creation
  // and metadata request sent
  Map<Long, Collection<TopicPartition>> partitionsInTimeC1 = new TreeMap<>();
  Map<Long, Collection<TopicPartition>> partitionsInTimeC2 = new TreeMap<>();
  new Thread( () -> {
      localConsumer.subscribe(Lists.newArrayList(TOPIC_1, TOPIC_2), 
        new TimeSensibleRebalanceListener(partitionsInTimeC1));
      localConsumer.poll(6_000);
  }).start();
  new Thread( () -> {
      otherConsumer.subscribe(Lists.newArrayList(TOPIC_1, TOPIC_2),
        new TimeSensibleRebalanceListener(partitionsInTimeC2));
      otherConsumer.poll(6_000);
  }).start();
  // Give some time to consumers to rebalance
  Thread.sleep(60_000);

  // First assumption, since it's round-robin, we expect to have the same number of partitions
  // assigned for each consumer
  // We expect also that only 1 rebalancing was made, since there are no more events related to group
  // composition
  List<Collection<TopicPartition>> notEmptyPartitionsC1FromStream = partitionsInTimeC1.values().stream()
          .filter(partitions -> !partitions.isEmpty()).collect(Collectors.toList());
  assertThat(notEmptyPartitionsC1FromStream).hasSize(1);
  Collection<TopicPartition> partitionsC1 = notEmptyPartitionsC1FromStream.get(0);
  assertThat(partitionsC1).hasSize(3);
  // Do the same checks for C2
  List<Collection<TopicPartition>> notEmptyPartitionsC2FromStream = partitionsInTimeC2.values().stream()
          .filter(partitions -> !partitions.isEmpty()).collect(Collectors.toList());
  assertThat(notEmptyPartitionsC2FromStream).hasSize(1);
  Collection<TopicPartition> partitionsC2 = notEmptyPartitionsC2FromStream.get(0);
  assertThat(partitionsC2).hasSize(3);
  // Check if there are no partitions consumed twice
  List<String> partitionNames1 = partitionsC1.stream()
    .map(p -> p.topic()+"_"+p.partition()).collect(Collectors.toList());
  List<String> partitionNames2 = partitionsC2.stream()
    .map(p -> p.topic()+"_"+p.partition()).collect(Collectors.toList());
  assertThat(partitionNames1).doesNotContainAnyElementsOf(partitionNames2);
}

@Test
public void should_correctly_assign_key_partitioner_to_producer() throws InterruptedException, IOException, ExecutionException {
  printAlert();
  String testName = "test3_";
  Properties producerProps = Context.getInstance().getCommonProperties();
  producerProps.setProperty("client.id", ProducerHelper.generateName("producer_", testName));
  producerProps.setProperty("partitioner.class", KeyNumberBasedPartitioner.class.getCanonicalName());

  KafkaProducer<String, String> localProducer =
          new KafkaProducer<>(ProducerHelper.decorateWithDefaults(producerProps));
  Future<RecordMetadata> recordMetadataFuture1 = 
    localProducer.send(new ProducerRecord<>(TOPIC_1, "1-a", "1a"));
  Future<RecordMetadata> recordMetadataFuture2 = 
    localProducer.send(new ProducerRecord<>(TOPIC_1, "1-b", "1b"));
  Future<RecordMetadata> recordMetadataFuture3 = 
    localProducer.send(new ProducerRecord<>(TOPIC_1, "1-c", "1c"));
  Future<RecordMetadata> recordMetadataFuture4 = 
    localProducer.send(new ProducerRecord<>(TOPIC_1, "2-a", "2a"));
  Future<RecordMetadata> recordMetadataFuture5 = 
    localProducer.send(new ProducerRecord<>(TOPIC_1, "1-d", "1d"));
  Future<RecordMetadata> recordMetadataFuture6 = 
    localProducer.send(new ProducerRecord<>(TOPIC_1, "2-b", "2b"));

  RecordMetadata metadata1 = recordMetadataFuture1.get();
  RecordMetadata metadata2 = recordMetadataFuture2.get();
  RecordMetadata metadata3 = recordMetadataFuture3.get();
  RecordMetadata metadata4 = recordMetadataFuture4.get();
  RecordMetadata metadata5 = recordMetadataFuture5.get();
  RecordMetadata metadata6 = recordMetadataFuture6.get();

  // Checks if partitioner worked correctly
  assertThat(metadata1.partition()).isEqualTo(1);
  assertThat(metadata2.partition()).isEqualTo(1);
  assertThat(metadata3.partition()).isEqualTo(1);
  assertThat(metadata4.partition()).isEqualTo(2);
  assertThat(metadata5.partition()).isEqualTo(1);
  assertThat(metadata6.partition()).isEqualTo(2);
}


private static class TimeSensibleRebalanceListener implements ConsumerRebalanceListener {

  private Map<Long, Collection<TopicPartition>> partitionsInTime;

  public TimeSensibleRebalanceListener(Map<Long, Collection<TopicPartition>> partitionsInTime) {
    this.partitionsInTime = partitionsInTime;
  }

  @Override
  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    partitionsInTime.put(System.currentTimeMillis(), partitions);
  }

  @Override
  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    partitionsInTime.put(System.currentTimeMillis(), partitions);
  }
}

private Properties getCommonConsumerProperties(String consumerId, Class<?> assignor) throws IOException {
  Properties consumerProps = Context.getInstance().getCommonProperties();
  consumerProps.setProperty("client.id", ConsumerHelper.generateName("_", consumerId));
  consumerProps.setProperty("auto.offset.reset", "earliest");
  consumerProps.setProperty("rebalance.max.retries", "20");
  consumerProps.setProperty("partition.assignment.strategy", assignor.getCanonicalName());
  return consumerProps;
}

private void sendMessages(Properties producerProps) {
  KafkaProducer localProducer =
          new KafkaProducer<>(ProducerHelper.decorateWithDefaults(producerProps));
  localProducer.send(new ProducerRecord<>(TOPIC_1, "A", "aaaaa"));
  localProducer.send(new ProducerRecord<>(TOPIC_2, "B", "bbbbb"));
  localProducer.send(new ProducerRecord<>(TOPIC_2, "C", "ccccc"));
  localProducer.send(new ProducerRecord<>(TOPIC_2, "D", "ddddd"));
  localProducer.send(new ProducerRecord<>(TOPIC_1, "E", "eeeee"));
  localProducer.flush();
}

This posts describes the main ideas of Kafka's rebalancing. The first part explains the concept and proves that it's a key concept for consumers organization inside each group. The second part describes available partitioners - as well for consumers as for producers. We can see there that the concept of partitioners is not the same for these both actors. For consumers, it detects for which partitions given consumer will be the owner. For producers, it detects on which partition given message will be sent. The last part shows, mostly, the difference between them and, in additional, shows under which circumstances consumer coordinator triggers a rebalance.

Share on: