Radix and Tim sort

Versions: Apache Spark 3.2.1

The topic of this blog post is not new because the discussed sort algorithms are there from Apache Spark 2. But it happens that I've never had a chance to present them and today I'll try to do it now.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in πŸ‘‰ Early Release on the O'Reilly platform

I also help solve your data engineering problems πŸ‘‰ contact@waitingforcode.com πŸ“©

Algorithms logic

Radix and Tim sort algorithms rely on different principles. Let's begin with the Radix algorithm.

Have you noticed the implementation? Yes, the algorithm uses the least digit to sort the values. When the digit is missing, as for the smallest numbers from the example, the Radix sort uses 0.

The Tim sort implementation is different:

As you can notice, under-the-hood it uses insertion and merge sort algorithms. The idea is to divide the array into buckets called runs and whenever the run is small enough, sort it with the insertion-based algorithm. In the end, Tim sort merges the sorted chunks into the final array.

In addition to these high-level views, let me share some other interesting facts:

Usage in Apache Spark

Apache Spark uses these sorting algorithms in the UnsafeShuffleWriter, so the object generating shuffle files for the Dataset API. The writer exists for each task and generates shuffle files by transferring each element of the shuffled records to ShuffleExternalSorter. The last class, in its turn, will use an instance of a ShuffleInMemorySorter that delegates the sorting logic to Tim or Redix sort algorithm.

Even though the diagram above shows the write(scala.collection.Iterator<Product2<K, V>> records), so the one called to generate shuffle files, the sort doesn't happen for each element individually. Instead, it happens only for:

Whenever one of them happens, the shuffle writer calls the Redix or Tim sort:

final class ShuffleInMemorySorter {

// ...
public ShuffleSorterIterator getSortedIterator() {
  int offset = 0;
  if (useRadixSort) {
    offset = RadixSort.sort(
      array, pos,
      PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX,
      PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false);
  } else {
    // ...
    Sorter<PackedRecordPointer, LongArray> sorter =
      new Sorter<>(new ShuffleSortDataFormat(buffer));
    sorter.sort(array, 0, pos, SORT_COMPARATOR);
  }
  return new ShuffleSorterIterator(pos, array, offset);
}

The sorting doesn't operate on the records themselves. It works on an array storing the pairs of the shuffle partition id and the input row reference. However, technically, the sort applies only to the partition id, and there is no data access involved whatsoever.

Some numbers

Radix sort is enabled by default and there is a reason for that. When Eric Liang added this feature in SPARK-14724, he measured the impact on the existing benchmarking environment. He observed an overall improvement by ~10% for the TPCDS queries when the Radix sort was enabled.

Additionally, you can find the microbenchmark results showing better results for the Radix sort. For Apache Spark 3.1.2, the results look like:

================================================================================================
radix sort
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
radix sort 25000000:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
reference TimSort key prefix array                13590          13887         420          1.8         543.6       1.0X
reference Arrays.sort                              3615           3644          40          6.9         144.6       3.8X
radix sort one byte                                 501            505           3         49.9          20.1      27.1X
radix sort two bytes                                998           1011          21         25.1          39.9      13.6X
radix sort eight bytes                             3894           3912          26          6.4         155.8       3.5X
radix sort key prefix array                        7299           7316          23          3.4         292.0       1.9X

The sort algorithms don't evolve that often in Apache Spark. The previous major change comes from the 1.1 release when the framework switched from a quicksort to Tim sort. Another one was released in 2.0 with the Radix algorithm. It's nearly 1 change per major version. Should we expect something sort-related in Apache Spark 3.x.x? I haven't found any signs in the backlog yet but in case I missed something, feel free to share!