Table file formats - Z-Order compaction: Apache Iceberg

Last time you discovered the Z-Order compaction in Delta Lake. But guess what? Apache Iceberg also has this feature!

4-day workshop · In-person or online

What would it take for you to trust your Databricks pipelines in production?

A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.

Unit, data & integration tests
Medallion architecture & Lakeflow SDP
Max 10 participants · production-ready templates
See the full curriculum → €7,000 flat fee · cohort of up to 10
Bartosz Konieczny
Bartosz
Konieczny

The Z-Compaction is a special ordering strategy for the compaction in Apache Iceberg. You control it as the other options, with a builder method:

SparkActions
  .get()
  .rewriteDataFiles(lettersTable)
  .zOrder("col1", "col2")
  .execute()

The underlying mechanism is also similar to what you saw in the Table file formats - compaction: Apache Iceberg. Let's see!

SparkZOrderStrategy

When you call the zOrder(...) highlighted in the previous snippet, you ask Apache Iceberg to create an instance of SparkZOrderStrategy. As other compaction classes, it implements a rewriteFiles(List<FileScanTask> filesToRewrite) to replace the data files. It's also the place where all the Z-Order magic I explained in the Table file formats - Z-Order compaction: Delta Lake, happens.

  1. First, the method verifies if the partitioning schema has changed between the table and the compaction action. If yes, the sort expression used for rewrite satisfies this new partitioning requirement.
  2. Next, the logic defines the number of shuffle partitions with this algorithm:
      	long numOutputFiles =
          	numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple()));
      	cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles));
    
    Since the Adaptive Query Execution is disabled for the compaction, this value based on the compression-factory property must ensure evenly sized files.
  3. It's time to prepare the Z-Order sorting now. The first step consists of creating a new column of the array type to store the values of the Z-Order columns sorted lexicographically:
      Column zvalueArray = functions.array(
         zOrderColumns.stream().map(colStruct ->
              zOrderUDF.sortedLexicographically(functions.col(colStruct.name()), colStruct.dataType())).toArray(Column[]::new));
    
    The sortedLexicographically method creates a User-Defined Function that orders the byte representations of the compared objects. Under-the-hood, Apache Iceberg uses the helper methods from ZOrderByteUtils to perform this action.
  4. After this preparation step, another Z-Order operation enters into action, the interleaving. The SparkZOrderStrategy adds a new column to the compressed dataset:
    Dataset<Row> zvalueDF = scanDF.withColumn(Z_COLUMN, zOrderUDF.interleaveBytes(zvalueArray));
    
    This column is used as a part of the sort expression generated in the first step:
      private static final org.apache.iceberg.SortOrder Z_SORT_ORDER =
      	org.apache.iceberg.SortOrder.builderFor(Z_SCHEMA)
          	.sortBy(Z_COLUMN, SortDirection.ASC, NullOrder.NULLS_LAST)
          	.build();
    
    I already presented the interleaving in the already quoted blog post about Z-Order and Delta Lake, so I won't repeat the details here to keep at least this article short enough.
  5. In the end, a new Dataset with the Z-Order sort expression gets created:
    LogicalPlan sortPlan = sortPlan(distribution, ordering, zvalueDF.logicalPlan(), sqlConf);
    Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, zvalueDF.encoder());
    
  6. Finally, the SparkZOrderStrategy triggers the compaction action with the writing:
      sortedDf
          .select(originalColumns)
          .write()
          .format("iceberg")
         
    .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID)
          	.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
          	.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
          .mode("append")
          .save(groupID)
    

Lexicographical order

If you've read my blog post about Z-Ordering in Delta Lake, the algorithm should look familiar. But there is one point that troubled me a bit, the sortedLexicographically method. Fortunately, some research helped me refresh some knowledge.

Your table may have signed numerical values such as integers or longs. These "signed" values are numbers with "+" or "-" signs that makes the whole Z-Order sorting more challenging. Natively, the signed values don't support their numerical ordering:

Order positionSigned numberBinary representation
110000 0001
220000 0010
3-21111 1110
4-11111 1111

It's weird, isn't it? We all know that the negative numbers go before the positive ones. Yes, we do but the machines don't and that's when the lexicographical ordering function helps. Let's take a look at its Javadoc:

public class ZOrderByteUtils {

  public static final int PRIMITIVE_BUFFER_SIZE = 8;

  private ZOrderByteUtils() {}

  static ByteBuffer allocatePrimitiveBuffer() {
	return ByteBuffer.allocate(PRIMITIVE_BUFFER_SIZE);
  }

  /**
   * Signed ints do not have their bytes in magnitude order because of the sign bit. To fix this,
   * flip the sign bit so that all negatives are ordered before positives. This essentially shifts
   * the 0 value so that we don't break our ordering when we cross the new 0 value.
   */
  public static ByteBuffer intToOrderedBytes(int val, ByteBuffer reuse) {
	ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE);
	bytes.putLong(((long) val) ^ 0x8000000000000000L);
	return bytes;
  }

As you can see, it flips the sign bit to correctly reorder the values. If we apply this to our table, we'll get the expected sorting:

Order positionSigned numberBinary representationFlipped integer
1-20111 1110126
2-10111 1111127
311000 0001129
421000 0010130

Now, all types can be sorted lexicographically and the Z-Order address used in the sort expression too!

It's the last table file format blog post before a short break. But there are more things to come, especially about schema evolution. Stay tuned!

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems contact@waitingforcode.com đź“©