Table file formats - compaction: Apache Iceberg

Versions: Apache Iceberg 1.1.0

Compaction is also a feature present in Apache Iceberg. However, it works a little bit differently than for Delta Lake presented last time. Why? Let's see in this new blog post!

New ebook ๐Ÿ”ฅ

Learn 84 ways to solve common data engineering problems with cloud services.

๐Ÿ‘‰ I want my copy

The general idea of compaction remains the same, though. The goal is to optimize the file storage by rearranging the data. That's the "what". When it comes to the "how", there are some differences and probably the most visible one is the presence of compaction strategies.

Compaction

Before we go into the strategies, let's focus on the big picture of the compaction itself. The operation involves the following steps:

Bin-packing strategy

Apache Iceberg uses one of 3 strategies to generate compaction groups and execute compaction jobs. The first strategy is called Bin-packing and it relies on the same principle as in the Delta Lake compaction, i.e. reducing the number of data files by putting as much data as possible together. To enable it you can omit the strategy option in the rewriteDataFiles action or specify it explicitly, as in the following snippet:

val lettersTable = Spark3Util.loadIcebergTable(sparkSession, "local.db.letters")
SparkActions
  .get()
  .rewriteDataFiles(lettersTable)
  .binPack()

Under-the-hood this call sets an internal strategy field to SparkBinPackStrategy. The strategy uses several options to customize the compaction behavior:

What is the bin-packing algorithm? First, in the selectFilesToRewrite(Iterable dataFiles) it finds the candidates for rewriting. Except rewrite-all situation, Apache Iceberg applies the following condition on top of the files:

  	return FluentIterable.from(dataFiles)
      	.filter(
          	scanTask ->
              	scanTask.length() < minFileSize
                  	|| scanTask.length() > maxFileSize
                  	|| taskHasTooManyDeletes(scanTask));

The selection works per partition. After getting the files, the operation prepares the compaction groups by calling the bin-packing algorithm defined in the planFileGroups method. The grouping puts file candidates into bins with the goal to reduce the number of created bins, therefore to put as much data in a single file as possible. Once groups resolved, the operation continues and either accepts all group candidates (rewrite-all flag set to true), or applies the filtering logic with the group-related options:

public Iterable<List<FileScanTask>> planFileGroups(Iterable<FileScanTask> dataFiles) {
// ...
return potentialGroups.stream()
      .filter(
       group ->
            (group.size() >= minInputFiles && group.size() > 1)
              || (sizeOfInputFiles(group) > targetFileSize && group.size() > 1)
              || sizeOfInputFiles(group) > maxFileSize
              || group.stream().anyMatch(this::taskHasTooManyDeletes))
.collect(Collectors.toList());

In the end of this preparation stage, tie rewrite action executes the rewriteFiles(List<FileScanTask> filesToRewrite) to physically change the storage layout. Basically, it's a read-write operation using the target and max file size configuration:

  protected long writeMaxFileSize() {
	return (long) (targetFileSize + ((maxFileSize - targetFileSize) * 0.5));
  }

Besides, there is also interesting information on how Apache Iceberg relates the files to rewrite groups with the reader. If you check the reading part, you won't notice any mention of the input files. Instead, there is a groupID:

  @Override
  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
	String groupID = UUID.randomUUID().toString();
	try {
  	tableCache.add(groupID, table);
  	manager.stageTasks(table, groupID, filesToRewrite);
// ...
  	Dataset<Row> scanDF =
      	cloneSession
          	.read()
          	.format("iceberg")
          	.option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
          	.option(SparkReadOptions.SPLIT_SIZE, splitSize(inputFileSize(filesToRewrite)))
          	.option(SparkReadOptions.FILE_OPEN_COST, "0")
          	.load(groupID);

The rewriting process uses a FileScanTaskSetManager that remembers the group and the contained files. Later, the SparkFilesScan gets the files list and creates a list of data reading tasks:

Sort strategy

The second strategy adds an extra sorting step, hence the name sort strategy. However, it's also related to bin-packing. The SortStrategy class extends the BinPackStrategy and it does overwrite nothing but the rewriteFiles method. As a result, the file groups are also created with the bin-packing algorithm and the single difference will be in the content of the compacted files.

The compaction result with sort-based strategy sorts rows inside the compacted data files. It's ensured by an extra sorting step added to the Apache Spark execution plan:

public class SparkSortStrategy extends SortStrategy {
// ...
  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
// ...
	SortOrder[] ordering;
	if (requiresRepartition) {
  	// Build in the requirement for Partition Sorting into our sort order
  	ordering =
      	SparkDistributionAndOrderingUtil.convert(
          	SortOrderUtil.buildSortOrder(table, sortOrder()));
	} else {
  	ordering = SparkDistributionAndOrderingUtil.convert(sortOrder());
	}

	Distribution distribution = Distributions.ordered(ordering);
// ...
  	SQLConf sqlConf = cloneSession.sessionState().conf();
  	LogicalPlan sortPlan = sortPlan(distribution, ordering, scanDF.logicalPlan(), sqlConf);
  	Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, scanDF.encoder());
// ...

Besides the classical sort-by, Apache Iceberg also supports Z-Order compaction strategy but as for Delta Lake, I'll cover it in another part of the "Table file formats..." series.

Manifests rewriting

The above was true for the data files. But Apache Iceberg also has a cleaning mechanism for the metadata files. The snippet rewrites manifest files that has deleted files:

val lettersTable = Spark3Util.loadIcebergTable(sparkSession, "local.db.letters")
SparkActions
  .get()
  .rewriteManifests(lettersTable)
  .rewriteIf(manifestFile => manifestFile.hasDeletedFiles)
  .execute()

The rewriteIf adds a new predicate to the rewrite action. The whole algorithm looks like:

Compaction is a process that helps, among others, eliminate small files and optimize the reading I/O. Apache Iceberg supports it for data and metadata layers with the fluent-builder API. How does it work for Apache Hudi? Let's see this in the next blog post of the Table file formats series!