Table file formats - Z-Order compaction: Apache Iceberg

Versions: Apache Iceberg 1.1.0

Last time you discovered the Z-Order compaction in Delta Lake. But guess what? Apache Iceberg also has this feature!

A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free

👉 Register here

The Z-Compaction is a special ordering strategy for the compaction in Apache Iceberg. You control it as the other options, with a builder method:

SparkActions
  .get()
  .rewriteDataFiles(lettersTable)
  .zOrder("col1", "col2")
  .execute()

The underlying mechanism is also similar to what you saw in the Table file formats - compaction: Apache Iceberg. Let's see!

SparkZOrderStrategy

When you call the zOrder(...) highlighted in the previous snippet, you ask Apache Iceberg to create an instance of SparkZOrderStrategy. As other compaction classes, it implements a rewriteFiles(List<FileScanTask> filesToRewrite) to replace the data files. It's also the place where all the Z-Order magic I explained in the Table file formats - Z-Order compaction: Delta Lake, happens.

  1. First, the method verifies if the partitioning schema has changed between the table and the compaction action. If yes, the sort expression used for rewrite satisfies this new partitioning requirement.
  2. Next, the logic defines the number of shuffle partitions with this algorithm:
      	long numOutputFiles =
          	numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple()));
      	cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles));
    
    Since the Adaptive Query Execution is disabled for the compaction, this value based on the compression-factory property must ensure evenly sized files.
  3. It's time to prepare the Z-Order sorting now. The first step consists of creating a new column of the array type to store the values of the Z-Order columns sorted lexicographically:
      Column zvalueArray = functions.array(
         zOrderColumns.stream().map(colStruct ->
              zOrderUDF.sortedLexicographically(functions.col(colStruct.name()), colStruct.dataType())).toArray(Column[]::new));
    
    The sortedLexicographically method creates a User-Defined Function that orders the byte representations of the compared objects. Under-the-hood, Apache Iceberg uses the helper methods from ZOrderByteUtils to perform this action.
  4. After this preparation step, another Z-Order operation enters into action, the interleaving. The SparkZOrderStrategy adds a new column to the compressed dataset:
    Dataset<Row> zvalueDF = scanDF.withColumn(Z_COLUMN, zOrderUDF.interleaveBytes(zvalueArray));
    
    This column is used as a part of the sort expression generated in the first step:
      private static final org.apache.iceberg.SortOrder Z_SORT_ORDER =
      	org.apache.iceberg.SortOrder.builderFor(Z_SCHEMA)
          	.sortBy(Z_COLUMN, SortDirection.ASC, NullOrder.NULLS_LAST)
          	.build();
    
    I already presented the interleaving in the already quoted blog post about Z-Order and Delta Lake, so I won't repeat the details here to keep at least this article short enough.
  5. In the end, a new Dataset with the Z-Order sort expression gets created:
    LogicalPlan sortPlan = sortPlan(distribution, ordering, zvalueDF.logicalPlan(), sqlConf);
    Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, zvalueDF.encoder());
    
  6. Finally, the SparkZOrderStrategy triggers the compaction action with the writing:
      sortedDf
          .select(originalColumns)
          .write()
          .format("iceberg")
         
    .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID)
          	.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
          	.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
          .mode("append")
          .save(groupID)
    

Lexicographical order

If you've read my blog post about Z-Ordering in Delta Lake, the algorithm should look familiar. But there is one point that troubled me a bit, the sortedLexicographically method. Fortunately, some research helped me refresh some knowledge.

Your table may have signed numerical values such as integers or longs. These "signed" values are numbers with "+" or "-" signs that makes the whole Z-Order sorting more challenging. Natively, the signed values don't support their numerical ordering:

Order positionSigned numberBinary representation
110000 0001
220000 0010
3-21111 1110
4-11111 1111

It's weird, isn't it? We all know that the negative numbers go before the positive ones. Yes, we do but the machines don't and that's when the lexicographical ordering function helps. Let's take a look at its Javadoc:

public class ZOrderByteUtils {

  public static final int PRIMITIVE_BUFFER_SIZE = 8;

  private ZOrderByteUtils() {}

  static ByteBuffer allocatePrimitiveBuffer() {
	return ByteBuffer.allocate(PRIMITIVE_BUFFER_SIZE);
  }

  /**
   * Signed ints do not have their bytes in magnitude order because of the sign bit. To fix this,
   * flip the sign bit so that all negatives are ordered before positives. This essentially shifts
   * the 0 value so that we don't break our ordering when we cross the new 0 value.
   */
  public static ByteBuffer intToOrderedBytes(int val, ByteBuffer reuse) {
	ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE);
	bytes.putLong(((long) val) ^ 0x8000000000000000L);
	return bytes;
  }

As you can see, it flips the sign bit to correctly reorder the values. If we apply this to our table, we'll get the expected sorting:

Order positionSigned numberBinary representationFlipped integer
1-20111 1110126
2-10111 1111127
311000 0001129
421000 0010130

Now, all types can be sorted lexicographically and the Z-Order address used in the sort expression too!

It's the last table file format blog post before a short break. But there are more things to come, especially about schema evolution. Stay tuned!