ACID file formats - writing: Apache Iceberg

Versions: Apache Iceberg 0.13.1

Last time you discovered data writing in Apache Hudi. Today it's time to see the 2nd file format from my list, Apache Iceberg.

New ebook ๐Ÿ”ฅ

Learn 84 ways to solve common data engineering problems with cloud services.

๐Ÿ‘‰ I want my Early Access edition

Operations and configuration

As previously, I'll start this time with some general info first. Since Apache Iceberg doesn't have different table types, let's see the available writing operations:

Write configuration properties are defined in TableProperties class as constants prefixed with WRITE_. Again, among subjectively chosen the most important ones, you'll find:

Besides, Apache Iceberg also has some Apache Spark-specific configuration properties for the writer:

File writers

Apache Iceberg writes the data files from one of 4 DataWriter implementations:

Since there is no 1-1 relationship between the high-level and low-level writers, I prepared a summary here:

Metadata delete

Apart from the writing classes, another point from the documentation that spotted my attention was the DELETE operation, and more precisely, its version working on the whole partitions. Why it's so interesting? Because Apache Iceberg can somehow manage this to not overwrite the data files but only the metadata.

Apache Iceberg comes with a set of logical rules and one of them is RewriteDelete. As you might already deduce, it's used to handle Apache Spark's DeleteFromTable command:

case class RewriteDelete(spark: SparkSession) extends Rule[LogicalPlan] with RewriteRowLevelOperationHelper {
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
  // don't rewrite deletes that can be answered by passing filters to deleteWhere in SupportsDelete
  case d @ DeleteFromTable(r: DataSourceV2Relation, Some(cond))
      if isMetadataDelete(r, cond) && isIcebergRelation(r) =>
    d
// rewrite all operations that require reading the table to delete records
case DeleteFromTable(r: DataSourceV2Relation, Some(cond)) if isIcebergRelation(r) =>
  // TODO: do a switch based on whether we get BatchWrite or DeltaBatchWrite

The key for understanding the metadata delete is this isMetadataDelete method that checks whether the delete query contains exactly one condition for the partition. Besides, the comments come from the original codebase and are pretty self-explanatory! When the delete doesn't target the partition, hence is not a metadata-delete, this logical rule rewrites the plan to something like that:

ReplaceData IcebergBatchWrite(table=local.db.letters, format=PARQUET), org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy$$Lambda$2737/229014598@5d2bc446
+- *(4) Project [id#26, upperCase#27, lowerCase#28, nestedLetter#29, creationTime#30]
   +- *(4) Sort [_file#43 ASC NULLS FIRST, _pos#44L ASC NULLS FIRST], false, 0
      +- *(4) Filter NOT ((isnotnull(lowerCase#28) AND (lowerCase#28 = xx)) <=> true)
         +- DynamicFileFilterExec[id#26, upperCase#27, lowerCase#28, nestedLetter#29, creationTime#30, _file#43, _pos#44L]
            :- *(1) Project [id#26, upperCase#27, lowerCase#28, nestedLetter#29, creationTime#30, _file#43, _pos#44L]
            : +- ExtendedBatchScan[id#26, upperCase#27, lowerCase#28, nestedLetter#29, creationTime#30, _file#43, _pos#44L] local.db.letters [filters=lowerCase IS NOT NULL, lowerCase = 'xx']
            +- *(3) HashAggregate(keys=[_file#43], functions=[], output=[_file#43])
               +- Exchange hashpartitioning(_file#43, 200), ENSURE_REQUIREMENTS, [id=#62]
                  +- *(2) HashAggregate(keys=[_file#43], functions=[], output=[_file#43])
                     +- *(2) Project [_file#43]
                        +- *(2) Filter (isnotnull(lowerCase#28) AND (lowerCase#28 = xx))
                           +- ExtendedBatchScan[id#26, upperCase#27, lowerCase#28, nestedLetter#29, creationTime#30, _file#43, _pos#44L] local.db.letters [filters=lowerCase IS NOT NULL, lowerCase = 'xx']

As you can see, the process eliminates the rows matching the filter from the files containing removed rows. The plan for the metadata delete is much simpler:

== Physical Plan ==
DeleteFromTable local.db.letters, [IsNotNull(upperCase), EqualTo(upperCase,B)], org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2582/830929141@467cd4b9

It follows the classical Apache Spark delete path from the DeleteFromTableExec:

case class DeleteFromTableExec(table: SupportsDelete, condition: Array[Filter], refreshCache: () => Unit) extends V2CommandExec {

  override protected def run(): Seq[InternalRow] = {
    table.deleteWhere(condition)
    refreshCache()
    Seq.empty
  }

The table implementation is SparkTable and it invokes the following to delete the files:

@Override
public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
    SupportsRead, SupportsWrite, ExtendedSupportsDelete, SupportsMerge {

public void deleteWhere(Filter[] filters) {
// ...
icebergTable.newDelete()
    .set("spark.app.id", sparkSession().sparkContext().applicationId())
    .deleteFromRowFilter(deleteExpr)
    .commit();

In this deleteFromRowFilter, Apache Iceberg creates an instance of PartitionAndMetricsEvaluator and checks whether the rows from each evaluated data file match the filtering expression which in our case, is the condition on the partitioning column. If the outcome is true, Apache Iceberg filters out the data file from the generated manifest without currently touching this data file physically. It's just a metadata marker saying that the readers shouldn't see the records from these files.

To confirm my sayings, below you can find an example of the manifest created after the partition delete operation (I removed the stats part for readability):

{
    "status" : 0,
    "snapshot_id" : 2113420582962524200,
    "data_file" : {
        "file_path" : "file:/tmp/acid-file-formats/006_writing/iceberg/db/letters/data/upperCase=A/00000-0-09909bdd-bab3-4b2e-88be-9ea8468f89df-00001.parquet",
        "file_format" : "PARQUET",
        "partition" : {
            "upperCase" : "A"
        },
        "record_count" : 1,
        "file_size_in_bytes" : 1835,
        "block_size_in_bytes" : 67108864
    }
}
{
    "status" : 0,
    "snapshot_id" : 2113420582962524200,
    "data_file" : {
        "file_path" : "file:/tmp/acid-file-formats/006_writing/iceberg/db/letters/data/upperCase=A/00001-1-13109587-274d-4539-9c17-802172e1145d-00001.parquet",
        "file_format" : "PARQUET",
        "partition" : {icon expand},
        "record_count" : 1,
        "file_size_in_bytes" : 1842,
        "block_size_in_bytes" : 67108864
    }
}
{
    "status" : 2,
    "snapshot_id" : 6974932437942515000,
    "data_file" : {
        "file_path" : "file:/tmp/acid-file-formats/006_writing/iceberg/db/letters/data/upperCase=B/00002-2-aa46caae-cdfb-440d-91c2-9bc33fd014fe-00001.parquet",
        "file_format" : "PARQUET",
        "partition" : {
            "upperCase" : "B"
        },
        "record_count" : 1,
        "file_size_in_bytes" : 1835,
        "block_size_in_bytes" : 67108864
    }
}
{
    "status" : 2,
    "snapshot_id" : 6974932437942515000,
    "data_file" : {
        "file_path" : "file:/tmp/acid-file-formats/006_writing/iceberg/db/letters/data/upperCase=B/00003-3-91bb2f9b-3b5a-443c-80eb-a4c3e954eb8a-00001.parquet",
        "file_format" : "PARQUET",
        "partition" : {
            "upperCase" : "B"
        },
        "record_count" : 1,
        "file_size_in_bytes" : 1842,
        "block_size_in_bytes" : 67108864
    }
}
{
    "status" : 0,
    "snapshot_id" : 2113420582962524200,
    "data_file" : {
        "file_path" : "file:/tmp/acid-file-formats/006_writing/iceberg/db/letters/data/upperCase=C/00004-4-a629fa0d-88a7-4f9e-b66a-5007e162cae8-00001.parquet",
        "file_format" : "PARQUET",
        "partition" : {
            "upperCase" : "C"
        },
        "record_count" : 1,
        "file_size_in_bytes" : 1835,
        "block_size_in_bytes" : 67108864
    }
}
{
    "status" : 0,
    "snapshot_id" : 2113420582962524200,
    "data_file" : {
        "file_path" : "file:/tmp/acid-file-formats/006_writing/iceberg/db/letters/data/upperCase=C/00005-5-6a2cab82-5a94-4b02-8622-cffcf3297217-00001.parquet",
        "file_format" : "PARQUET",
        "partition" : {
            "upperCase" : "C"
        },
        "record_count" : 1,
        "file_size_in_bytes" : 1841,
        "block_size_in_bytes" : 67108864
    }
}
{
    "status" : 0,
    "snapshot_id" : 2113420582962524200,
    "data_file" : {
        "file_path" : "file:/tmp/acid-file-formats/006_writing/iceberg/db/letters/data/upperCase=C/00006-6-8b59d4d4-f69e-47ad-98e1-e1b2c5c833bf-00001.parquet",
        "file_format" : "PARQUET",
        "partition" : {
            "upperCase" : "C"
        },
        "record_count" : 1,
        "file_size_in_bytes" : 1849,
        "block_size_in_bytes" : 67108864
    }
}

The key part is the status attribute defined in org.apache.iceberg.ManifestEntry.Status as:

public static enum Status {
    EXISTING(0),
    ADDED(1),
    DELETED(2);
// ...
}

And the tree output for the data files (deleted partition was "B", and the data is still there):

Distribution mode

To finish investigating the interesting internals of Apache Iceberg writers, I had to understand what the write.distribution-mode is for. The first thing to notice, it only works if the table is partitioned. Below you can see a method mapping the table distribution:

public static Distribution buildRequiredDistribution(org.apache.iceberg.Table table) {
  DistributionMode distributionMode = distributionModeFor(table);
  switch (distributionMode) {
    case NONE:
      return Distributions.unspecified();
    case HASH:
      if (table.spec().isUnpartitioned()) {
        return Distributions.unspecified();
      } else {
        return Distributions.clustered(toTransforms(table.spec()));
      }
    case RANGE:
      if (table.spec().isUnpartitioned() && table.sortOrder().isUnsorted()) {
        return Distributions.unspecified();
      } else {
        org.apache.iceberg.SortOrder requiredSortOrder = SortOrderUtil.buildSortOrder(table);
        return Distributions.ordered(convert(requiredSortOrder));
      }
    default:
      throw new IllegalArgumentException("Unsupported distribution mode: " + distributionMode);
  }
}

A partitioned table is a requirement, but what is this distribution mode? At first I thought it was a kind of bucketing-like feature of Iceberg, but it doesn't make sense since the partitioning already defines how the records are colocated. Moreover, you won't find a mention of it during the inserts. It's a table-based property you can set like me just below:

sparkSession.sql(
  s"""
    |CREATE OR REPLACE TABLE local.db.letters_hash (
    | id INT,
    | upperCase STRING,
    | lowerCase STRING
    |)
    |USING iceberg
    |PARTITIONED BY (upperCase)
    |TBLPROPERTIES('write.distribution-mode' = 'hash')
    |""".stripMargin)

If this table property is not used in the inserts, then when? The answer is, in the row-level operations, such as update, delete [not partition delete], and merge. In the code you will find its mention in the RewriteRowLevelOperationHelper implementations which are RewriteDelete, RewriteUpdate, and RewriteMerge. They're logical rules that replace the initial Apache Spark plan by, among others, including the shuffling corresponding to the distribution mode. Need a proof? Below you can find execution plans for MERGE operation for 3 different scenarios where the distribution is respectively set to none, hash, and range:

After writing this blog post I appreciate even more the idea of changing the blog format and instead of covering one aspect from a high level, getting down to finer level of details. Even though I'm aware there are certainly some other interesting writing aspects to cover! If you have something in mind, please leave a comment below to complete the article. And meantime, see you next week, this time with Delta Lake writing logic.