Table file formats - isolation levels: Delta Lake

Versions: Delta Lake 2.4.0 https://github.com/bartosz25/acid-file-formats/tree/main/009_isolation_levels/delta_lake

If Delta Lake implemented the commits only, I could stop exploring this transactional part after the previous article. But as for RDBMS, Delta Lake implements other ACID-related concepts. One of these are isolation levels.

Reminder

Before going further, a quick reminder about the isolation levels. In a nutshell, they define how other users see the active transaction, hence, how they see the data written by this transaction.

Weak levels make the data available sooner but with the risk of dirty reads or lost updates. Why can they happen? Well, a transaction is there to guarantee an integrity of the data processed in the query. Therefore, if there is something extra needed to guarantee the consistency, there is an inherent risk of failure for the transaction. And because of that:

Stronger isolation levels prevent these issues but with the trade-off of the performance. They require an additional coordination level that controls the concurrency of the operations.

Setting the isolation level in Delta Lake

Isolation level is individually set for each table with the delta.isolationLevel property:

sparkSession.sql(
  s"""
     |CREATE TABLE ${tableName} (id bigint) USING delta
     | TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')
     |""".stripMargin)

There are 3 isolation levels in Delta Lake code base but only the first one from the list below, are available for the table settings:

Using anything else will lead to a failure:

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: delta.isolationLevel must be Serializable
    at scala.Predef$.require(Predef.scala:281)
    at org.apache.spark.sql.delta.DeltaConfig.validate(DeltaConfig.scala:59)
    at org.apache.spark.sql.delta.DeltaConfig.apply(DeltaConfig.scala:70)
    at org.apache.spark.sql.delta.DeltaConfigsBase.$anonfun$validateConfigurations$1(DeltaConfig.scala:163)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)

There is currently an open Merge Request to enable the WriteSerializable level in the vanilla Delta Lake. Other levels seem to be available on the Databricks version only. You can see this when you compare the documentation about conflicts:

Serializable vs. WriteSerializable

However, both serializable levels are present in the code base and they define a different behavior. Theoretically, let's see what is the difference if they were implemented. To understand it, let's deep dive into the code, and more exactly, into the ConflictChecker#checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn. The function starts with this mapping:

val addedFilesToCheckForConflicts = isolationLevel match {
    case WriteSerializable if !currentTransactionInfo.metadataChanged =>
      winningCommitSummary.changedDataAddedFiles // don't conflict with blind appends
    case Serializable | WriteSerializable =>
      winningCommitSummary.changedDataAddedFiles ++ winningCommitSummary.blindAppendAddedFiles
    case SnapshotIsolation =>
      Seq.empty
}

In a human friendly language you can see both serializable isolation levels detect conflicts by analyzing all files modified in the transaction about to be committed. Additionally, they also include the blind appends, so the files created from a simple INSERT INTO statements. However, this is only true for the WriteSerializable level if the pending transaction contains any metadata changes. Otherwise, the blind appends are ignored. Why this distinction? It comes from the streaming updates problem as explained by Prakhar Jain in this commit:

Now any transaction which has non-file actions goes through conflict detection against blind append unlike before. This may lead to failure of such transactions.
This was intentionally done to stop METADATA / other non-file-action transactions to follow conflict detection. But as a side effect, Streaming Updates (which have SetTransaction action) also started following this flow and started failing.

Downgrading isolation level

But the isolation level resolution is not that simple. Delta Lake can decide at a runtime to downgrade the isolation level! The magic happens in OptimisticTransactionImpl#getIsolationLevelToUse:

protected def getIsolationLevelToUse(preparedActions: Seq[Action], op: DeltaOperations.Operation): IsolationLevel = {
  val isolationLevelToUse = if (canDowngradeToSnapshotIsolation(preparedActions, op)) {
    SnapshotIsolation
  } else {
    getDefaultIsolationLevel()
  }
  isolationLevelToUse
}

Under some conditions the initial isolation level can be changed to the SnapshotIsolation. When? If the data files modified within the transaction don't contain the data changes and:

An isolation level downgraded to the snapshot isolation skips some of the conflict detection methods, as for example detection of the concurrently written files:

protected def checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn(): Unit = {
recordTime("checked-appends") {
  // Fail if new files have been added that the txn should have read.
  val addedFilesToCheckForConflicts = isolationLevel match {
    case WriteSerializable if !currentTransactionInfo.metadataChanged =>
      winningCommitSummary.changedDataAddedFiles // don't conflict with blind appends
    case Serializable | WriteSerializable =>
      winningCommitSummary.changedDataAddedFiles ++ winningCommitSummary.blindAppendAddedFiles
    case SnapshotIsolation =>
      Seq.empty
  }

Examples

The examples for the isolation level overlap the ones from the commits. Therefore, this time you'll find 3 short demos. Two first ones are for the supported isolation levels at the table creation. The first example should create the table correctly...

sparkSession.sql(
  s"""
     |CREATE TABLE ${tableName} (id bigint) USING delta
     | TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')
     |""".stripMargin)

(0 to 100).toDF("id").writeTo(tableName).using("delta").createOrReplace()

...while the latter should fail:

sparkSession.sql(
  s"""
     |CREATE TABLE ${tableName} (id bigint) USING delta
     | TBLPROPERTIES ('delta.isolationLevel' = 'WriteSerializable')
     |""".stripMargin)

(0 to 100).toDF("id").writeTo(tableName).using("delta").createOrReplace()

The last code snippet demonstrates the isolation level downgrade. Here, the process executing the OPTIMIZE operation changes the level to the snapshot isolation, as I explained in the previous section:

sparkSession.sql(
  s"""
    |CREATE TABLE ${tableName} (id bigint) USING delta
    | TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')
    |""".stripMargin)

(0 to 100).toDF("id").writeTo(tableName).using("delta").createOrReplace()
val deltaTable = DeltaTable.forName(tableName)

val barrier = new CountDownLatch(2)
new Thread(() => {
  try {
    println("Starting the 'OPTIMIZE'")
    deltaTable.optimize().executeCompaction()
  } finally {
    barrier.countDown()
  }
}).start()
new Thread(() => {
  try {
    while (barrier.getCount != 1) {}
    // Soon after the OPTIMIZE, let's start the writing
    println("Starting the 'update 100'")
    (0 to 100).toDF("id").write.format("delta").insertInto(tableName)
  } finally {
    barrier.countDown()
  }
}).start()
barrier.await()

I'm closing the transactional chapter of Delta Lake exploration, at least for now. Initially I wanted to cover the checkpoint but it's under a serious refactoring for the incoming release. So I prefer to keep it for Delta Lake 3.0.0!


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!