ACID file formats - API

Versions: Apache Hudi 0.10.0, Apache Iceberg 0.13.1, Delta Lake 1.1.0 https://github.com/bartosz25/acid-file-formats/tree/main/000_api

It's time to start a new series on the blog! I hope to catch on to the ACID file formats that are gaining more and more importance. It's also a good occasion to test a new learning method. Instead of writing one blog post per feature and format, I'll try to compare Delta Lake, Apache Iceberg, and Apache Hudi concepts in the same article. Besides this personal challenge, I hope you'll enjoy the series and also learn something interesting!

New ebook 🔥

Learn 84 ways to solve common data engineering problems with cloud services.

👉 I want my Early Access edition

I'll start the first blog post of the series with a high-level view of the API. Here, you'll see the code you can use to perform the basic operations, like writing the data, creating a table, or querying it.

Surprises

Before discussing the writing options, I'd like to share the problems faced during the setup. Naturally, I had much less issues with Delta Lake since I've already written some code with this format. It was not so rosy for Apache Hudi and Apache Iceberg. But let me explain that for Hudi first:

After all these issues, I was ready to go with Apache Hudi. However, I hadn't been expecting to configure as many things. Hopefully, in the following weeks, I will understand better why. And what about Apache Iceberg?

Delta Lake

In the analysis I'll check the following aspects:

Delta Lake can write a DataFrame and create a table from a SQL expression. Moreover, it supports the new V2 API.

    val inputData = Seq(
      Order(1, 33.99d, "Order#1"), Order(2, 14.59d, "Order#2"), Order(3, 122d, "Order#3")
    ).toDF
    inputData.write.format("delta").save(outputDir)
    sparkSession.sql(s"CREATE TABLE default.orders USING DELTA LOCATION '${outputDir}'")
    inputData.writeTo("orders_from_write_to").using("delta").createOrReplace()
    sparkSession.sql("DROP TABLE IF EXISTS orders_from_sql")
    sparkSession.sql(
      s"""
        |CREATE OR REPLACE TABLE orders_from_sql (
        | id LONG,
        | amount DOUBLE,
        | title STRING
        |) USING delta LOCATION "${outputDir}/orders_from_sql_${System.currentTimeMillis()}"
        |""".stripMargin)
    sparkSession.sql(
      """
        |INSERT INTO orders_from_sql (id, amount, title) VALUES
        |(1, 33.99, "Order#1"), (2, 14.59, "Order#2"), (1, 122, "Order#3")
        |""".stripMargin)

What about reading? Again, all cases checked! You can query Delta Lake tables with the programmatic API and SQL:

    sparkSession.read.format("delta").load(outputDir).where("amount > 40").show(false)
    sparkSession.sql("SELECT * FROM default.orders WHERE amount > 40").show(false)
    sparkSession.sql("SELECT * FROM orders_from_write_to WHERE amount > 40").show(false)
    sparkSession.sql("SELECT * FROM orders_from_sql WHERE amount > 40").show(false)

Can we write the same snippet for Apache Hudi? Let's see!

Apache Hudi

For Apache Hudi, some previously presented operations are not supported. When you try to run the inputData.writeTo("orders_from_write_to").using("hudi").createOrReplace() , you'll get the error of an unsupported writing mode ("REPLACE TABLE AS SELECT is only supported with v2 tables"). Similar error pops up when you try to run the SQL commands to create the table. I didn't manage to solve them just by reading the documentation, so instead I used the old and good one createOrReplaceTempView. Anyway, the single working method to create a Hudi table was the programmatic API:

    val inputData = Seq(
      Order(1, 33.99d, "Order#1"), Order(2, 14.59d, "Order#2"), Order(3, 122d, "Order#3")
    ).toDF

    inputData.write.format("hudi").options(getQuickstartWriteConfigs)
      .option("hoodie.table.name", "orders")
      .option("hoodie.datasource.write.operation", INSERT_OPERATION_OPT_VAL)
      .option("hoodie.datasource.write.recordkey.field", "id")
      .mode(SaveMode.Overwrite)
      .save(outputDir)

I had less troubles for the reading part because it supports DataFrame API and SQL:

    val allReadOrders = sparkSession.read.format("hudi").load(outputDir)
    sparkSession.read.format("hudi").load(outputDir).where("amount > 40").show(false)
    sparkSession.sql("SELECT * FROM orders_table WHERE amount > 40").show(false)

One thing surprised me, though. The SELECT statements return both data and metadata information. Maybe I misconfigured something or didn't find the relevant information in the documentation, but well, writing data was a bit more difficult than for Delta Lake. I'll be happy to learn if you have any suggestions!

Apache Iceberg

For Apache Iceberg, the experience was smoother than for Hudi. Except for one thing. The V1 DataFrame API. I didn't succeed in configuring the SparkSession to support both V1 and V2 APIs. Later on, I found the documentation doesn't recommend the V1, so I didn't consider it as a blocker point and moved on in the tests:

The v1 DataFrame write API is still supported, but is not recommended.
When writing with the v1 DataFrame API in Spark 3, use saveAsTable or insertInto to load tables with a catalog. Using format("iceberg") loads an isolated table reference that will not automatically refresh tables used by queries.

The writing part is relatively similar to the Delta Lake's:

inputData.writeTo("local.db.orders_from_write_to").using("iceberg").createOrReplace()
sparkSession.sql("DROP TABLE IF EXISTS orders_from_sql")
sparkSession.sql(
  s"""
     |CREATE OR REPLACE TABLE local.db.orders_from_sql (
     | id LONG,
     | amount DOUBLE,
     | title STRING
     |) USING iceberg
     |""".stripMargin)
sparkSession.sql(
  """
    |INSERT INTO local.db.orders_from_sql (id, amount, title) VALUES
    |(1, 33.99, "Order#1"), (2, 14.59, "Order#2"), (1, 122, "Order#3")
    |""".stripMargin)

During the reading tests I discovered a new V2 API method to read data from tables, the table(...) function:

    sparkSession.table("local.db.orders_from_write_to").where("amount > 40").show(false)
    sparkSession.sql("SELECT * FROM local.db.orders_from_write_to WHERE amount > 40").show(false)
    sparkSession.sql("SELECT * FROM local.db.orders_from_sql WHERE amount > 40").show(false)

I tested the table() method on Delta Lake and Apache Hudi. Both formats support it correctly.

If you're still hungry after this introduction, no worries! It's only an introduction! Next weeks I'll publish more in depth blog posts about these 3 ACID file formats. And if you already have some questions about their features, feel free to leave a comment. I'll try to include them in my planning!