Partitioning in Spark

on waitingforcode.com

Partitioning in Spark

Partitioning in distributed data is quite common concept. Spark is not an exception and it also has some operations related to partitions.

This time we focus on partitions which have direct impact on efficient computation across the network. In the first part of this post we can learn some theoretic aspects of Spark's partitions. The second part describes some transformations that can be used to modify established partitioning. Finally, the last part, presents some code, as well for the partitioning itself, as for transformations triggering it.

Partitions

One of partitions goal consists in reducing the amount of data needed to be transferred across the network. As well, maximally use of partitions helps to execute jobs quicker. It's because a lot of tasks in this case can be executed in parallel by different cluster nodes. By default Spark computes the number of partitions that should be reserved for given RDD. But the programmer can also specify this parameter when a RDD is created.

Partitions contain data from one specific RDD. One partition can't have mixed data coming from 2 different RDDs. Each partition is stored in a single machine but each machine can store several different partitions.

For pair RDDs (key-value ones) data distribution is based on partitioners. Technically it's represented by the implementations of abstract class org.apache.spark.Partitioner. Two partitioners are available:

  • HashPartitioner - the partition is based on the hash code of object to store. Currently used formula is:
    partition = hashCode % numberOfPartitions
    
  • RangePartitioner - for this type of partitioner, the partition is chosen by making ranges of values. To figure out how does it exactly work, let's take an example with a List of 100 numbers, from 1 to 100. If we want to store them in 5 partitions, RangePartitions will first compute the number of ranges (5: [1-20], [21-40], [41-60], [61-80], [81-100]). After, the data will be placed according to matching between range values and data key. So for example, the tuple with key 5 goes to the range [1-20], with key 59 to the range [41-60] and so on.

    If the number of ranges is lower than expected partitions, the number of partitions will be reduced.

It's also possible to create a customized partitioner.

Transformations triggering partitioning

However, the explicit definition is not a single method to trigger partitioning. Through some of transformations we can trigger partitioning:

  • coalesce(numPartitions) - decreases the number of partitions. This method can be used after filtering a big dataset leaving a lot of partitions poorly filled. By reducing the number of partitions could help in executing further operations more efficiently.
    As one additional parameter, this method takes a flag telling if shuffle step is expected by the user. If shuffle is enabled, it means that not only partitions decreasing is possible, but increasing too. However, it means also a performance risk since data will be moved across network and partitioned according to hash partitioner.
  • repartition(numPartitions) - almost the same as coalesce. The difference is that with repartition we can always increase or decrease the number of partitions while coalesce needs a shuffle flag set to true to create additional partitions.

    Under-the-hood, this method calls coalesce by always specifying shuffle flag to true. It means that data is always moved across the network with hash partitioner. To change partitioner used during shuffle we must rather use repartitionAndSortWithinPartitions(partitioner) method.

Spark partitioning examples

Let's now see some test cases showing all we've previously read:

private static final Function<Integer, String> LABEL_MAKER = (number) -> "Number is " + number;
private static final List<Tuple2<Integer, String>> TEST_DATA = IntStream.rangeClosed(1, 100).boxed()
  .map(number -> new Tuple2<>(number, LABEL_MAKER.apply(number)))
  .collect(Collectors.toList());

@Test
public void should_correctly_partition_numbers_through_range_partitioner() {
  // If you're looking at source code of sortByKey(...) method, you'll see that
  // it uses RangePartitioner to dispatch RDD data on specified number of partitions:
  //   def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
  //      : RDD[(K, V)] = self.withScope
  // {
  //    val part = new RangePartitioner(numPartitions, self, ascending)
  //    new ShuffledRDD[K, V, V](self, part)
  //     .setKeyOrdering(if (ascending) ordering else ordering.reverse)
  // }
  int numberOfPartitions = 5;
  JavaPairRDD<Integer, String> rangePartitionedRDD =
    CONTEXT.parallelizePairs(TEST_DATA).sortByKey(true, numberOfPartitions);

  assertThat(rangePartitionedRDD.partitions()).hasSize(5);
  assertThat(rangePartitionedRDD.partitions()).extracting("index").containsOnly(0, 1, 2, 3, 4);
  // Check how data was dispatched among partitions  - normally we expect to have 20 elements by partition
  int t = 0;
  List<Tuple2<Integer, String>>[] dataByPartition = rangePartitionedRDD.collectPartitions(new int[]{0, 1, 2, 3, 4});
  assertThat(dataByPartition[0]).hasSize(20);
  assertThat(dataByPartition[1]).hasSize(20);
  assertThat(dataByPartition[2]).hasSize(20);
  assertThat(dataByPartition[3]).hasSize(20);
  assertThat(dataByPartition[4]).hasSize(20);
  for (int i = 0; i < rangePartitionedRDD.partitions().size(); i++) {
    List<Tuple2<Integer, String>> partitionData = dataByPartition[i];
    for (Tuple2<Integer, String> partitionDataEntry : partitionData) {
      assertThat(partitionDataEntry).isEqualTo(TEST_DATA.get(t));
      t++;
    }
  }
}

@Test
public void should_partition_data_by_range_and_create_less_partitions_than_expected() {
  List<Tuple2<Integer, String>> numbers = IntStream.rangeClosed(1, 100).boxed()
    .map(number -> new Tuple2<>(number%2, "Number is " + number))
    .collect(Collectors.toList());
  int numberOfPartitions = 5;
  JavaPairRDD<Integer, String> rangePartitionedRDD =
    CONTEXT.parallelizePairs(numbers).sortByKey(true, numberOfPartitions);

  // Even if we expect to have 5 partitions, Spark will generate only 3 partitions
  // one for "0" range, one for "1" and one empty
  assertThat(rangePartitionedRDD.partitions()).hasSize(3);
}

@Test
public void should_partition_data_with_hash_partitioner() {
  int numberOfPartitions = 5;
  JavaPairRDD<Integer, String> defaultPartitionedRDD =
    CONTEXT.parallelizePairs(TEST_DATA).partitionBy(new HashPartitioner(numberOfPartitions));
  Optional<Partitioner> partitioner = defaultPartitionedRDD.partitioner();
  assertThat(partitioner.isPresent()).isTrue();
  assertThat(partitioner.get()).isInstanceOf(HashPartitioner.class);

  // Now check how pairs were partitioned
  Map<Integer, List<Integer>> expectedPartitions = new HashMap<>();
  IntStream.rangeClosed(0, 4).forEach(i -> expectedPartitions.put(i, new ArrayList<>()));
  for (int key = 1; key <= 100; key++) {
    int partitionNr = key % numberOfPartitions;
    expectedPartitions.get(partitionNr).add(key);
  }

  List<Tuple2<Integer, String>>[] dataByPartition = defaultPartitionedRDD.collectPartitions(new int[]{0, 1, 2, 3, 4});

  assertThat(dataByPartition[0]).hasSameSizeAs(expectedPartitions.get(0));
  assertThat(dataByPartition[0].stream().map(tuple -> tuple._1()).collect(Collectors.toList()))
    .containsExactlyElementsOf(expectedPartitions.get(0));
  assertThat(dataByPartition[1]).hasSameSizeAs(expectedPartitions.get(1));
  assertThat(dataByPartition[1].stream().map(tuple -> tuple._1()).collect(Collectors.toList()))
    .containsExactlyElementsOf(expectedPartitions.get(1));
  assertThat(dataByPartition[2]).hasSameSizeAs(expectedPartitions.get(2));
  assertThat(dataByPartition[2].stream().map(tuple -> tuple._1()).collect(Collectors.toList()))
    .containsExactlyElementsOf(expectedPartitions.get(2));
  assertThat(dataByPartition[3]).hasSameSizeAs(expectedPartitions.get(3));
  assertThat(dataByPartition[3].stream().map(tuple -> tuple._1()).collect(Collectors.toList()))
    .containsExactlyElementsOf(expectedPartitions.get(3));
  assertThat(dataByPartition[4]).hasSameSizeAs(expectedPartitions.get(4));
  assertThat(dataByPartition[4].stream().map(tuple -> tuple._1()).collect(Collectors.toList()))
    .containsExactlyElementsOf(expectedPartitions.get(4));
}

@Test
public void should_use_coalesce_and_make_expected_changes_on_partitions() {
  // First, we create RDD with 5 partitions by
  // specifying it explicitly
  int numberOfPartitions = 5;
  JavaPairRDD<Integer, String> rangePartitionedRDD = CONTEXT.parallelizePairs(TEST_DATA, numberOfPartitions);

  assertThat(rangePartitionedRDD.partitions()).hasSize(5);

  // Now we reduce the number of partitions
  int newNumberOfPartitions = 3;
  JavaPairRDD<Integer, String> coalescedRDD = rangePartitionedRDD.coalesce(newNumberOfPartitions);

  assertThat(coalescedRDD.partitions()).hasSize(newNumberOfPartitions);

  // If we want to increase the number of partitions,
  // it won't work
  int increasedNumberOfPartitions = 8;
  JavaPairRDD<Integer, String> coalescedIncreasedRDD = coalescedRDD.coalesce(increasedNumberOfPartitions);

  assertThat(coalescedIncreasedRDD.partitions()).hasSize(newNumberOfPartitions);
  assertThat(coalescedIncreasedRDD.toDebugString()).doesNotContain("ShuffledRDD");

  // But if we coalesce with shuffle step enabled,
  // we'll able to increase the number of partitions
  boolean shuffleEnabled = true;
  coalescedIncreasedRDD = coalescedRDD.coalesce(increasedNumberOfPartitions, shuffleEnabled);

  assertThat(coalescedIncreasedRDD.partitions()).hasSize(increasedNumberOfPartitions);
  assertThat(coalescedIncreasedRDD.toDebugString()).contains("ShuffledRDD");
}

@Test
public void should_correctly_repartition_data() {
  int numberOfPartitions = 5;
  JavaPairRDD<Integer, String> rangePartitionedRDD = CONTEXT.parallelizePairs(TEST_DATA, numberOfPartitions);

  // With repartition(...) we can increase and decrease the number of
  // partitions. It's because under-the-hood this method calls
  // coalesce(n, shuffled=true):
  //def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  //    coalesce(numPartitions, shuffle = true)
  // }
  // Assertions contains exact indices to show better the idea of coalesce use
  // To see test working, run it in separation from the others
  JavaPairRDD<Integer, String> decreasedPartitionsRDD = rangePartitionedRDD.repartition(2);
  assertThat(decreasedPartitionsRDD.partitions()).hasSize(2);
  assertThat(decreasedPartitionsRDD.toDebugString()).contains("ShuffledRDD[2]");

  JavaPairRDD<Integer, String> increasedPartitionsRDD = decreasedPartitionsRDD.repartition(10);
  assertThat(increasedPartitionsRDD.partitions()).hasSize(10);
  assertThat(increasedPartitionsRDD.toDebugString()).contains("ShuffledRDD[6]", "ShuffledRDD[2]");
}

@Test
public void should_repartition_data_with_custom_partitioner() {
  int numberOfPartitions = 5;
  JavaPairRDD<Integer, String> rangePartitionedRDD =
    CONTEXT.parallelizePairs(TEST_DATA).sortByKey(true, numberOfPartitions);

  JavaPairRDD<Integer, String> dummyPartitionedRDD = rangePartitionedRDD
    .repartitionAndSortWithinPartitions(new LongNumbersDiscriminatingPartitioner());

  assertThat(dummyPartitionedRDD.partitions()).hasSize(2);
  assertThat(dummyPartitionedRDD.partitions()).extracting("index").containsOnly(0, 1);
  List<Tuple2<Integer, String>>[] dataByPartition = dummyPartitionedRDD.collectPartitions(new int[]{0, 1});
  assertThat(dataByPartition[0]).hasSize(9);
  assertThat(dataByPartition[0].stream().map(tuple -> tuple._1()).collect(Collectors.toList()))
    .containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9);
  assertThat(dataByPartition[1]).hasSize(91);
  List<Integer> bigNumbersList =
    dataByPartition[1].stream().map(tuple -> tuple._1()).collect(Collectors.toList());
  for (int i = 10; i <= 100; i++) {
    assertThat(bigNumbersList).contains(i);
  }
}

// Dummy partitioner moving pairs with a key lower than 10 to one partition
// and with bigger or equal to 10 to the other partition
private static class LongNumbersDiscriminatingPartitioner extends Partitioner {

  @Override
  public int numPartitions() {
    return 2;
  }

  @Override
  public int getPartition(Object key) {
    Integer keyInt = (Integer) key;
    if (keyInt > 9) {
      return 1;
    }
    return 0;
  }
}

This article introduces a concept of partitions, strictly related to RDD and shuffle process. The first part shows how data can be moved among partitions - by computing a hash or by checking in which range fills given key. The second part describes which methods can be used to change partitions size. We can learn from there that methods increasing partitions number need to shuffle data. The last part contains an example of first two parts. We can see there custom partitioner, partitioning done with native Spark's partitioners and transformations used to change partitions number.

Share on: