Range partitioning in Apache Spark SQL

on waitingforcode.com

Range partitioning in Apache Spark SQL

The most popular partitioning strategy divides the dataset by the hash computed from one or more values of the record. However other partitioning strategies exist as well and one of them is range partitioning implemented in Apache Spark SQL with repartitionByRange method, described in this post.

The post is composed of 3 sections. Since range partitioning is not a pure Big Data invention, the first one introduces the idea of range partitioning from a global point of view and gives some examples coming from Apache Spark predecessors. The next one introduces the basic features of repartitionByRange function whereas the last one shows how the ranges are computed under-the-hood.

Range partitioning

Range partition algorithm divides the dataset into multiple partitions of consecutive and not overlapping ranges of values. For instance, in MySQL you can define it this way:

CREATE TABLE users (
  // ... some fields
  age INT(3) NOT NULL
)
PARTITION BY RANGE (age) (
  PARTITION 0_to_20 VALUES LESS THAN (21),
  PARTITION 21_to_40 VALUES LESS THAN (41),
  PARTITION 41_to_60 VALUES LESS THAN (61),
  PARTITION 61_to_max VALUES LESS THAN (MAXVALUE)
)

Thanks to range partitioning you can group similar items inside the same place, like or instance all orders for a given month. One of the advantages of this approach is the possibility to optimize the compression rate since the main idea behind the compression is to represent a repetitive value with fewer bits. And the probability that the range-grouped data have a lot in common is bigger than for other partitioning algorithms. For example, in our age-partitioning example, we could suppose that the users of the same age will like similar type of music, will have similar professional responsibilities and so forth.

repartitionByRange

Apache Spark SQL implements range partitioning with repartitionByRange(numPartitions: Int, partitionExprs: Column*) added in 2.3.0 version. When called, the function creates numPartitions of partitions based on the columns specified in partitionExprs, like in this snippet:

  private val ordersToRepartition = Seq(
    (10, "order 1", 2000d), (11, "order 2", 240d),
    (12, "order 3", 232d), (13, "order 4", 100d),
    (14, "order 5", 11d), (15, "order 6", 20d),
    (16, "order 7", 390d), (17, "order 8", 30d),
    (18, "order 9", 99d), (19, "order 10", 55d),
    (20, "order 11", 129d), (21, "order 11", 75d),
    (22, "order 13", 173d)
  ).toDF("id", "name", "amount")

  "range partitioning" should "partition datasets in 3 partitions without explicit order" in {
    val repartitionedOrders = ordersToRepartition.repartitionByRange(3, $"id")
      .mapPartitions(rows => {
        val idsInPartition = rows.map(row => row.getAs[Int]("id")).toSeq.sorted.mkString(",")
        Iterator(idsInPartition)
      }).collect()

    repartitionedOrders should have size 3
    repartitionedOrders should contain allOf("10,11,12,13,14", "15,16,17,18", "19,20,21,22")
  }

By default the partitioning expression is sorted in ascending order. If you want to use descending order, you can specify it directly in the compression:

  "range partitioning" should "partition datasets in 3 partitions with explicit order" in {
    val repartitionedOrders = ordersToRepartition.repartitionByRange(3, $"id".desc)
      .mapPartitions(rows => {
        val idsInPartition = rows.map(row => row.getAs[Int]("id")).toSeq.sorted.mkString(",")
        Iterator(idsInPartition)
      }).collect()

    ordersToRepartition.repartitionByRange(3, $"id".desc).explain(true)

    repartitionedOrders should have size 3
    repartitionedOrders should contain allOf("18,19,20,21,22", "14,15,16,17", "10,11,12,13")
  }

If you want, you can also omit the parameter with the number of partitions. In such a case Apache Spark will use the value specified in spark.sql.shuffle.partitions configuration property.

repartitionByRange internals

To understand the implementation details, I will start by printing the execution plan for above queries:

== Parsed Logical Plan ==
'RepartitionByExpression ['id ASC NULLS FIRST], 3
+- Project [_1#16 AS id#20, _2#17 AS name#21, _3#18 AS amount#22]
   +- LocalRelation [_1#16, _2#17, _3#18]

== Analyzed Logical Plan ==
id: int, name: string, amount: double
RepartitionByExpression [id#20 ASC NULLS FIRST], 3
+- Project [_1#16 AS id#20, _2#17 AS name#21, _3#18 AS amount#22]
   +- LocalRelation [_1#16, _2#17, _3#18]

== Optimized Logical Plan ==
RepartitionByExpression [id#20 ASC NULLS FIRST], 3
+- LocalRelation [id#20, name#21, amount#22]

== Physical Plan ==
Exchange rangepartitioning(id#20 ASC NULLS FIRST, 3)
+- LocalTableScan [id#20, name#21, amount#22]

The snippet gives already a lot of context for the classes involved during the partitioning logic. As you can see, everything starts in RepartitionByExpression which handles all of 3 repartitioning modes in Apache Spark SQL, namely range, hash, and round-robin. Our range partitioning distinguishes from the others by the presence of the ordering clause in the partitioning expression.

Range partitioning logic is handled by org.apache.spark.RangePartitioner which knows the targeted number of partitions and the RDD to repartition. Its most valuable property is the array of upper bounds for the first (# partitions - 1) partitions, computed with a randomized algorithm called reservoir sampling. I will detail it more in one of next posts published in the Big Data algorithms category. After sampling data from all partitions, the partitioner checks whether the number of sampled items is evenly balanced across all samples. If it's not the case, it replaces the imbalanced samples and by new balanced samples.

At the end, the partitioner selects (# partitions - 1) upper bounds with this algorithm (simplified version):

input = [(Key, Weight)]
sorted_input = sort_by_Key(input)

avg_weight_per_partition = sum_Weight(input) / number_of_partitions
i = 0
while (created_bounds <= number_of_partitions):
  accumulated_weight += sorted_input[i][Weight]
  if accumulated_weight >= avg_weight_per_partition:
    bounds += sorted_input[i][Key]

bounds

As you can see, the algorithm uses the weight property associated with each sampled key to figure out when a new partition boundary should be produced. The weight attribute is the division of the number of sampled items by the number of items to sample per each partition.

Actually the algorithm is more complicated than that but I wanted to emphasize the principal logic of calculating the boundaries. If you are interested in, you can find all the details in org.apache.spark.RangePartitioner#determineBounds method.

Before I terminate, I would like to share with you a short test checking whether the partition can be skewed for randomly chosen keys. Your dataset won't probably have random keys but by choosing them, I wanted to verify whether the reservoir sampling doesn't have a bad impact on the distribution:

    val testedRepartitions = (0 to 10).map(testNumber => {
      val ordersToRepartition = (0 to 1000000).map(nr => ThreadLocalRandom.current().nextLong(0, 1000000L)).toDF("nr")

      val repartitionedOrders = ordersToRepartition.repartitionByRange(3, $"nr")
        .mapPartitions(rows => {
          val idsInPartition = rows.map(row => row.getAs[Long]("nr")).toSeq.size
          Iterator(idsInPartition)
        }).collect()

      val sum = repartitionedOrders.sum
      val distribution = repartitionedOrders.map(itemsCount => {
        (itemsCount.toDouble * 100.0d) / sum.toDouble
      })
      (testNumber, distribution)
    })

    testedRepartitions.foreach {
      case (nr, partitionsSizes) => println(s"${nr}=${partitionsSizes.mkString(",")}")
    }

Since the test uses randomly generated numbers, it's hard to assert on them. But if you run it several times, you should see that the difference between stored items on each partition is always around 4 points. After 3 consecutive runs, the biggest difference I got was 9 points for (partition1=38%, partition2=29%, partition3=33%) distribution.

Although the distribution is rather correct, please remember that if in your dataset some partitioning keys are more popular than the others, you will end up with unbalanced dataset whatever happens - unless you use round-robin algorithm but it won't work for all data use cases, like for instance the ones involving grouping.

Range partitioning is one of 3 partitioning strategies in Apache Spark. As shown in the post, it can be used pretty easily in Apache Spark SQL module thanks to the repartitionBy method taking as parameters the number of targeted partitions and the columns used in the partitioning. In the 3rd section you can see some of the implementation details. Among them you will find the method used to define partition boundaries. The logic uses data sampled with reservoir sampling algorithm to figure out the partitions to create. In the following test cases you can see that this method works pretty well and guarantees quite even data distribution across partitions.

Share, like or comment this post on Twitter:

Share on: