Table file formats - Z-Order compaction: Delta Lake

Versions: Delta Lake 2.2.0

In my recent exploration of the compaction, aka OPTIMIZE command, in Delta Lake, I found this famous Z-Ordering mode. It was one of the most outstanding features when I first heard about Delta Lake. You can't even imagine how impatient I was to see what it is doing under-the-hood. Fortunately, this time has come!

Z-Order vs. partitioning

Let's talk first about the Z-Order relationship to the partitioning. First, there are some similarities between these concepts. Both rely on the data layout organization to perform data processing optimizations. However, it also implies the first differences. Partitioning is a pure data layout concept because it's based on the partition path. On the other hand, the Z-Order is a metadata-based optimization requiring statistics on the indexed columns. So it won't be exposed anyhow by the file system or object store. Instead, all relevant information will be stored in the metadata layer (_commit_log).

Second, the type of columns is different for both optimizations. Partitioning works best with low cardinality (= a few distinct values) columns. Otherwise, it can create a significant overhead on the partitions metadata management side (listing, indexing in a data catalog, ...). By contracts, the Z-Order optimization is adapted to high cardinality (= a lot of distinct values) columns. The operation consists of collocating related rows in the same data files and with a few distinct values, it would create big files and finally, could be less efficient than the basic and write-optimized partitioning.

As you can see, both concepts, albeit performing data storage optimization, operate on different levels and apply to different types of columns. Now when this aspect is clear, let's go to the Z-Order algorithm before analyzing how Delta Lake implements it.

Z-Order curve

The Z-Order curve wouldn't be necessary if there were no limitation of the lexicographical order. Unfortunately, there is one concerning multi-columns sort. Let's take this picture where the data is ordered by the columns letter and isVowel:

The lexicographic order works well for the first column but not at all if the query involves only the secondary index where the filtering remains local to the query after reading all the rows. Put differently, the search space exploded from a single to 5 places. The Z-Order curve addresses that explosion issue by reducing the search space and colocating close rows from this 2-dimensional space together.

To better understand the collocation, let's check another picture. It shows the lexicographic order for a 2-dimensional space consisting of 0 <= x <= 7 and 0 <= y <= 7:

You can see that the query for the "WHERE x = 0 or y = 0" condition needs to read 9 files. On the other hand, the Z-Order collocated data requires only 7 files:

How does this collocation work? The Z-Order curve interleaves binary coordinate values starting to the right with the x-bit and alternating to the left with the y-bit. How does it impact our previous schema? Let's replace the (x, y) by the Z-Values:

The decimal representation helps understand another point. Why do we call this a Z-Curve? If you connect the numerical values of the decimal representation you will see the Z-pattern appearing:

Z-Order in Delta Lake

After all this theoretical introduction it's time to see Delta Lake implementation of the Z-Order optimization. First, the OptimizeTableCommand looks for the presence of the Z-Order columns in the command. If they are present, it passes them to the OptimizeExecutor and starts the optimization process.

The OptimizeExecutor starts by setting a isMultiDimClustering flag that will be used to adapt the operation semantic:

class OptimizeExecutor(sparkSession: SparkSession, txn: OptimisticTransaction, partitionPredicate: Seq[Expression],
	zOrderByColumns: Seq[String])
  extends DeltaCommand with SQLMetricsReporting with Serializable {

 private val isMultiDimClustering = zOrderByColumns.nonEmpty

What happens next looks similar to the classical compaction job:

Although the high-level steps are similar, the bin job is completely different from the classical compaction process. The ZOrderClustering class is responsible for the Z-Order compaction. It changes the execution plan a bit by adding:

Eventually, the runner can add a column to deal with data skew. You can control this behavior with the spark.databricks.io.skipping.mdc.addNoise flag.

Once the plan with the interleavebits and range partitioning built, the partition DataFrame is repartitioned on the interleaving bits value:

  	df.withColumn(repartitionKeyColName, mdcCol)
    	.repartitionByRange(approxNumPartitions, col(repartitionKeyColName))

That way, the compaction process writes Z-Order-optimized data for each partition.

The Z-Order optimization is also present in Apache Iceberg and that will be the topic of the next blog post of my "Table file formats..." series.