Idempotent writer

Versions: Delta Lake 4.0 https://github.com/bartosz25/delta-lake-playground/tree/master/022_idempotent_writer

If you are old experienced enough, you should remember Apache Spark Structured Streaming file sink where the commit log stores already written files in a dedicated file. Delta Lake uses a similar concept to guarantee idempotent writes, but with less storage overhead.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩

⚠ Not only writer

Delta Lake writer only guarantees writing idempotency! If your data processing logic is not idempotent, your results will be incorrect because you will ignore writing the results that have changed between two consecutive executions. In that case a better alternative will be the MERGE operation, as long as the rows' ids don't change, even when there are retrieS.

Configuration

There are two possible ways to configure the idempotent writer:

Depending on how you want to control the idempotency, whether it's individually for some of the writers, or globally across all writers in the job, you will respectively prefer the local or the global configuration. But there is a gotcha with the global configuration. In both cases you need to provide a numerical and incrementing txnVersion. If it's not a number, the write will fail while when you don't provide an incrementing number, the written data will be ignored. To understand why, you need to know what happens under-the-hood. The next schema summarizes the internal behavior of the idempotent writer:


As you can see, before Delta Lake starts writing the data files, it first checks the specified txnAppId and txnVersion against a mapping created from the commit logs. The mapping stores the most recent transaction version for each application interacting with the table. Consequently, your writer will only succeed if the txnVersion is greater than the value present in the table.

Independently on how you configure, you must keep in mind the mapping is global for the table. Consequently, for many writers you should take care of setting unique app ids. Otherwise, the writers might be silently ignored. It's worth mentioning the application id can also be null and this missing value can also lead to surprising behavior.

Auto-reset

If you decide to implement the idempotent writers with the SparkSession-scoped, you must be aware of an additional configuration property, the spark.databricks.delta.write.txnVersion.autoReset.enabled. When you enable it, Delta Lake will remove the txnVersion after each write. As a result, if your job has many writers, only the first will succeed, unless you explicitly set a new txnVersion.

When by mistake you specify both SparkSession-scoped and writer configuration, Delta Lake will prefer the writer ones. You can see it in the DeltaCommand#getTxnVersionAndAppId, used to retrieve the id and version before each write:

  private def getTxnVersionAndAppId(
    sparkSession: SparkSession,
    options: Option[DeltaOptions]): (Option[Long], Option[String], Boolean) = {
    var txnVersion: Option[Long] = None
    var txnAppId: Option[String] = None
    for (o <- options) {
      txnVersion = o.txnVersion
      txnAppId = o.txnAppId
    }
    var numOptions = txnVersion.size + txnAppId.size
    // ...
    var fromSessionConf = false
    if (numOptions == 0) {
      txnVersion = sparkSession.sessionState.conf.getConf(
        DeltaSQLConf.DELTA_IDEMPOTENT_DML_TXN_VERSION)
      txnAppId = sparkSession.sessionState.conf.getConf(
        DeltaSQLConf.DELTA_IDEMPOTENT_DML_TXN_APP_ID)
      numOptions = txnVersion.size + txnAppId.size
      // ...
      fromSessionConf = true
    }
    (txnVersion, txnAppId, fromSessionConf)
  }

Streaming sink

The idempotent writer works for the batch API only. When you write a Delta Lake table with the streaming API, Delta Lake takes care of setting the app id and transaction version. Your settings, even at the SparkSession level, will be ignored. The next schema shows where these two values come from:


You will also find these details in the commit logs, in the txn part:

{"txn":{"appId":"4f469844-8519-4121-89c0-60a8453f660f","version":0,"lastUpdated":1759419766775}}

⚠ Query id != query name

While query name is the property you can set to easily identify your job in Spark UI, the query id is automatically managed by Apache Spark. It's an auto-generated UUID.

It's worth adding that if you try to reprocess the dataset by removing checkpoint files, the reprocessing will skip the batches corresponding to the removed files because the query id is also checkpointed in the file called metadata in your checkpoint location:

cat /tmp/checkpoint_demo6/metadata
{"id":"960b3316-d6fa-4217-9e4a-89ccd164a1f7"}

Idempotent writer is a good candidate if you need to protect yourself against writing the same data twice, or performing costly MERGE-based deduplication logic otherwise. It also greatly integrates with the foreachBatch sink where you can get the batch number natively for streaming writers. However, there are some gotchas good to know such as always incrementing txnVersion and a global table-based mapping between the app ids and the most recent transactions.

Consulting

With nearly 16 years of experience, including 8 as data engineer, I offer expert consulting to design and optimize scalable data solutions. As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!

👉 contact@waitingforcode.com
đź”— past projects