Table file formats - streaming reader: Delta Lake

Versions: Delta Lake 3.0.0

Even though I'm into streaming these days, I haven't really covered streaming in Delta Lake yet. I only slightly blogged about Change Data Feed but completely missed the fundamentals. Hopefully, this and next blog posts will change this!

Streaming a Delta Lake table is relatively easy from the end user's perspective. All you have to do is to use the stream API as in the example below:

// Batch
sparkSession.table(tableName).show(truncate = false)

// Streaming
sparkSession.readStream.format("delta").table(tableName)

Although this high level looks like the difference is small, many other interesting things happened under-the-hood. Let's see some of them.

DeltaSource

When you issue a readStream.format("delta") you indirectly call the streaming functions exposed by the DeltaSource class:

The first 2 calls are part of a standard path for Structured Streaming data sources execution. Delta-specific points start from the createSource method where these 2 important actions for the rest of processing happen:

Planning the offsets

The first step for the streaming reader is to get the starting position. If the query has already been running it's easy as the information comes from the checkpoint files. Otherwise, there are 2 alternative ways. The time-travel if you set the startingVersion or startingTimestamp options, or the default one if you set nothing in the options.

We could stop here but Delta Lake has another options impacting this very first stage, the rate limits. You can configure either the max data size or max number of files to read with, respectively, maxBytesPerTrigger and maxFilesPerTrigger. If you don't set, by default Delta Lake will limit your reader to 1000 files. But beware, these "files" are data files and not the commits! Moreover, the bytes limit is an approximate one because it analyzes the size of the whole file defined in the commit log:

def admit(fileAction: Option[FileAction]): Boolean = {
  // ...
  def getSize(action: FileAction): Long = {
    action match {
      case a: AddFile =>
	a.size
      case r: RemoveFile =>
	r.size.getOrElse(0L)
      case cdc: AddCDCFile =>
	cdc.size
   }
}

val shouldAdmit = hasCapacity

if (fileAction.isEmpty) {
  return shouldAdmit
}

take(files = 1, bytes = getSize(fileAction.get))
  // ...
}
protected def take(files: Int, bytes: Long): Unit = {
  filesToTake -= files
  bytesToTake -= bytes
}

Default limit

Delta Lake data source class implements the SupportsAdmissionControl interface used by the micro-batch executor to get the default limit, defined as follows for Delta Lake:

// DeltaSource
override def getDefaultReadLimit: ReadLimit = {
  AdmissionLimits().toReadLimit
}

// AdmissionLimits
// this one is called by the admit function referenced above
var filesToTake = maxFiles.getOrElse {
  if (options.maxBytesPerTrigger.isEmpty) {
    DeltaOptions.MAX_FILES_PER_TRIGGER_OPTION_DEFAULT
  } else {
    Int.MaxValue - 8 // - 8 to prevent JVM Array allocation OOM
  }
}

def toReadLimit: ReadLimit = {
  if (options.maxFilesPerTrigger.isDefined && options.maxBytesPerTrigger.isDefined) {
    CompositeLimit(
      ReadMaxBytes(options.maxBytesPerTrigger.get),
      ReadLimit.maxFiles(options.maxFilesPerTrigger.get).asInstanceOf[ReadMaxFiles])
  } else if (options.maxBytesPerTrigger.isDefined) {
    ReadMaxBytes(options.maxBytesPerTrigger.get)
  } else {
    ReadLimit.maxFiles(  	options.maxFilesPerTrigger.getOrElse(DeltaOptions.MAX_FILES_PER_TRIGGER_OPTION_DEFAULT))
  }
}

// DeltaOptions
val MAX_FILES_PER_TRIGGER_OPTION_DEFAULT = 1000

Consequently, if none of the throughput control options is present, by default Delta Lake will read 1000 files.

Offsets, indexes, and reservoirs

The Delta Lake's offsets introduce a reservoir concept which turns out to be a legacy name for a Delta table (Jacek explains it greatly in his Delta Lake internals book). You can find this, and a few other attributes, in the Delta-specific offset class:

case class DeltaSourceOffset private(
  sourceVersion: Long,
  reservoirId: String, reservoirVersion: Long,
  index: Long, isStartingVersion: Boolean
  ) extends Offset with Comparable[DeltaSourceOffset] {

Among the attributes you'll find:

Getting the data

Next it comes data retrieval from the selected files, hence the DataFrame creation.

Let's focus on the rightmost box of the schema. The first step here is to get all files included between the starting and ending offsets by calling the same method as for applying the rate limits, the getFileChanges. The single difference is that for data retrieval, the ending offset is always defined while for the rate limit, it's empty since the max options control when to stop reading:

protected def getFileChangesAndCreateDataFrame
// ...
  val fileActionsIter = getFileChanges(
    startVersion, startIndex, isStartingVersion, endOffset = Some(endOffset)
  )

At this moment it's worth mentioning the index generation. If you look at the commit log's content, you'll see there is any index mention for the files part. It's a runtime property generated as an auto-incremented number individually for each file. Here is the snippet:

private def filterAndGetIndexedFiles(iterator: Iterator[Action], version: Long, 
// ...
): Iterator[IndexedFile] = {
// ...
  var index = -1L
  val indexedFiles = new Iterator[IndexedFile] {
     override def hasNext: Boolean = filteredIterator.hasNext
     override def next(): IndexedFile = {
        index += 1 // pre-increment the index (so it starts from 0)
        val add = filteredIterator.next().copy(stats = null)
        IndexedFile(version, index, add, isLast = !filteredIterator.hasNext)
      }
  }
// ...

However, this operation only returns the files so called IndexedFiles, included in the offsets range. It's only the next method, createDataFrame(indexedFiles: Iterator[IndexedFile]) that converts those files into a DataFrame. It has to deal with 2 situations. The first one is relatively easy as it takes all the files and forwards the DataFrame creation to the DeltaLog#createDataFrame with the streaming flag (isStreaming) set to true. Under-the-hood, the function reads the content of the files and exposes it as a DataFrame:

// DeltaLog#createDataFrame
val fileIndex = new TahoeBatchFileIndex(spark, actionType, addFiles, this, dataPath, snapshot)
val relation = buildHadoopFsRelationWithFileIndex(snapshot, fileIndex, bucketSpec = None)
Dataset.ofRows(spark, LogicalRelation(relation, isStreaming = isStreaming))

But it's not that simple for the second scenario which happens if any of the files has a deletion vector. It's not a pure streaming feature but has an impact on reading. Without deletion vector, Delta Lake overwrites all the files with rows modified by in-place operations (UPDATE, DELETE). It's the Copy-on-Write strategy. With deletion vectors, the commit writes only the modified rows and leaves the responsibility for getting the most recent version to the readers. Hence the Merge-on-Read. For that reason, the streaming reader can't directly use the indexed files.

The specificity for the deletion vectors comes from the technical limitation and is clearly explained in the comment:

if (hasDeletionVectors) {
  // Read AddFiles from different versions in different scans.
  // This avoids an issue where we might read the same file with different deletion vectors in
  // the same scan, which we cannot support as long we broadcast a map of DVs for lookup.
  // This code can be removed once we can pass the DVs into the scan directly together with the
  // AddFile/PartitionedFile entry.
  addFiles.groupBy(_.version).values
    .map { addFilesList =>
      deltaLog.createDataFrame(readSnapshotDescriptor, 
         addFilesList.map(_.getFileAction.asInstanceOf[AddFile]),
          isStreaming = true)
       }
       .reduceOption(_ union _)

In other words, it creates one DataFrame for each commit version instead of one common DataFrame for all version if there is no deletion vectors:

Starting version

Very often in the code you'll find a reference to a starting version. The getBatch resolves this flag from the extractStartingState alongside the start commit and start index to read from. The flag is hardcoded to true only for the real first micro-batch execution if you don't specify the time-travel options.

The flag later goes to the getFileChanges via createDataFrameBetweenOffsets. It's used there to reload the snapshot:

// DeltaSource#getFileChanges
var iter = if (isStartingVersion) {
  Iterator(1, 2).flatMapWithClose {  // so that the filterAndIndexDeltaLogs call is lazy
    case 1 => getSnapshotAt(fromVersion).toClosable
    case 2 => filterAndIndexDeltaLogs(fromVersion + 1)
  }
} else {
  filterAndIndexDeltaLogs(fromVersion)
}

Why this new snapshot retrieval? Remember the first part of the blog post? Yes, the reader first uses the most recent table snapshot and its schema which might not correspond to the version in the data reading part. Consequently, getting a new snapshot may result in a schema error but I let you discover this schema part at the end of the blog post.

Configuration properties

The time-travel and rate limit options are not the single configuration properties. Besides them you can use:

These skip or ignore flags are key to understand why getting files for the micro-batch involves this function:

protected def validateCommitAndDecideSkipping(
  actions: Iterator[Action], version: Long,
  batchStartVersion: Long, batchEndVersionOpt: Option[Long] = None,
  verifyMetadataAction: Boolean = true): (Boolean, Option[Metadata], Option[Protocol]) = {
/// ...
val shouldAllowChanges = options.ignoreChanges || ignoreFileDeletion || skipChangeCommits
	
val shouldAllowDeletes = shouldAllowChanges || options.ignoreDeletes || ignoreFileDeletion
/// ...
if (removeFileActionPath.isDefined) {
  if (seenFileAdd && !shouldAllowChanges) {
    throw DeltaErrors.deltaSourceIgnoreChangesError(version, removeFileActionPath.get,
      deltaLog.dataPath.toString
    )
  } else if (!seenFileAdd && !shouldAllowDeletes) {
    throw DeltaErrors.deltaSourceIgnoreDeleteError(version, removeFileActionPath.get,
      deltaLog.dataPath.toString
    )
  }
}

Schema incompatibilities

When you use an Apache Spark Structured Streaming job for Delta Lake you must be careful when the streamed table evolves the schema. If it gets a new column, that shouldn't be a problem as you planned the job to deal with some specific data structure. However, it's not the case and when the table changes, you'll encounter an, as in the snippets below where the table renamed the "number" column to the "id_number"...:

org.apache.spark.sql.streaming.StreamingQueryException: [DELTA_SCHEMA_CHANGED_WITH_VERSION] Detected schema change in version 3:
streaming source schema: root
-- number: integer (nullable = true)
-- letter: string (nullable = true)

data file schema: root
-- id_number: integer (nullable = true)
-- letter: string (nullable = true)

Please try restarting the query. If this issue repeats across query restarts without
making progress, you have made an incompatible schema change and need to start your
query from scratch using a new checkpoint directory.

...and here the error when a new column was added:

[DELTA_SCHEMA_CHANGED_WITH_VERSION] Detected schema change in version 1:
streaming source schema: root
-- number: integer (nullable = true)
-- letter: string (nullable = true)


data file schema: root
-- number: integer (nullable = true)
-- letter: string (nullable = true)
-- upper_letter: string (nullable = true)


Please try restarting the query. If this issue repeats across query restarts without
making progress, you have made an incompatible schema change and need to start your
query from scratch using a new checkpoint directory.

As the last error recovers by itself after relaunching the job, let's focus on the first one..

Here, the streaming job has been operating on the table with the following operations: table creation => schema evolution by adding a new column => schema evolution by renaming an existing and streamed column. As you can deduce, this workflow records not only the data but also the metadata operations in the commit log:

$ grep "metaData" numbers_with_letters/_delta_log/*
# numbers_with_letters/_delta_log/00000000000000000000.json
{"commitInfo":{"timestamp":1704618048658,"operation":"CREATE OR REPLACE TABLE AS SELECT","operationParameters":{"isManaged":"true","description":null,"partitionBy":"[]","properties":"{\"delta.enableChangeDataFeed\":\"true\"}"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"4","numOutputBytes":"705"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.0.0","txnId":"0563cc31-789b-44f5-b48b-f7094f07daf2"}}
{"metaData":{"id":"d3219e3c-fce8-4f4e-9ca8-dc53e45d9890","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"number\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"letter\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1704618045295}}

# numbers_with_letters/_delta_log/00000000000000000002.json
{"commitInfo":{"timestamp":1704618069524,"operation":"RENAME COLUMN","operationParameters":{"oldColumnPath":"number","newColumnPath":"id_number"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.0.0","txnId":"044b6e69-317a-47fb-8a0f-d57962b4b2c7"}}
{"metaData":{"id":"d3219e3c-fce8-4f4e-9ca8-dc53e45d9890","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id_number\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":1,\"delta.columnMapping.physicalName\":\"number\"}},{\"name\":\"letter\",\"type\":\"string\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":2,\"delta.columnMapping.physicalName\":\"letter\"}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed":"true","delta.columnMapping.mode":"name","delta.columnMapping.maxColumnId":"2"},"createdTime":1704618045295}}

How does it impact the streaming reader? After all, it processes data and might ignore the metadata. That is unfortunately not true as the metadata operations are a part of the streaming reader:

# DeltaSource#validateCommitAndDecideSkipping(actions: Iterator[Action], ...)
actions.foreach {
  case m: Metadata =>
     if (verifyMetadataAction) {
      checkReadIncompatibleSchemaChanges(m, version, batchStartVersion, 
           batchEndVersionOpt)
      }
      assert(metadataAction.isEmpty,
        "Should not encounter two metadata actions in the same commit")
      metadataAction = Some(m)

The highlighted method is the one responsible for verifying the schema compatibility:

// #checkReadIncompatibleSchemaChanges
// ...
if (!SchemaUtils.isReadCompatible(schemaChange, schema,
      forbidTightenNullability = shouldForbidTightenNullability,
      allowMissingColumns = isStreamingFromColumnMappingTable &&
         allowUnsafeStreamingReadOnColumnMappingSchemaChanges &&
         backfilling,
      newPartitionColumns = newMetadata.partitionColumns,
      oldPartitionColumns = oldMetadata.partitionColumns
)) {
    val retryable = !backfilling && SchemaUtils.isReadCompatible(
      schema, schemaChange, forbidTightenNullability = shouldForbidTightenNullability)
    throw DeltaErrors.schemaChangedException(
      schema,
      schemaChange,
      retryable = retryable,
      Some(version),
      includeStartingVersionOrTimestampMessage = options.containsStartingVersionOrTimestamp)
}

And this method is quite popular by the way. If you analyze the invocation tree, you'll find many places relying on it to see whether the stream can move on.

Now the question is, will it break also with the column mapping? After all, it's there to enable schema evolution scenarios including renaming or dropping columns.

Schema tracking

The answer is yes, but before ending this article, let's see why. First thing to do if you want to enable column mapping for streaming readers, besides setting the former on the table, is to define a schemaTrackingLocation on each streamed table. An important requirement here is the location itself that must be within the checkpoint location. Otherwise, you'll get this error:

Exception in thread "Thread-43" org.apache.spark.sql.delta.DeltaAnalysisException: [DELTA_STREAMING_SCHEMA_LOCATION_NOT_UNDER_CHECKPOINT] Schema location '/tmp/checkpoint1704602355716d/schema_tracking/_schema_log_89f5928a-83fc-48d4-a1c3-c55ab8a36043' must be placed under checkpoint location 'file:/tmp/checkpoint1704602355716'.

The feature will create a schema_tracking directory under the checkpoint location and write there all recorded schema changes. Below you can find two files created after renaming a column from number to id_number:

# schema_tracking/_schema_log_e756583b-b765-4810-a191-d19893e54aa8/0
{"tableId":"e756583b-b765-4810-a191-d19893e54aa8","deltaCommitVersion":1,"dataSchemaJson":"{\"type\":\"struct\",\"fields\":[{\"name\":\"number\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":1,\"delta.columnMapping.physicalName\":\"number\"}},{\"name\":\"letter\",\"type\":\"string\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":2,\"delta.columnMapping.physicalName\":\"letter\"}}]}","partitionSchemaJson":"{\"type\":\"struct\",\"fields\":[]}","sourceMetadataPath":"file:/tmp/checkpoint1/sources/0","tableConfigurations":{"delta.enableChangeDataFeed":"true","delta.columnMapping.mode":"name","delta.columnMapping.maxColumnId":"2"},"protocolJson":"{\"protocol\":{\"minReaderVersion\":2,\"minWriterVersion\":5}}"}

# schema_tracking/_schema_log_e756583b-b765-4810-a191-d19893e54aa8/1
{"tableId":"e756583b-b765-4810-a191-d19893e54aa8","deltaCommitVersion":2,"dataSchemaJson":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id_number\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":1,\"delta.columnMapping.physicalName\":\"number\"}},{\"name\":\"letter\",\"type\":\"string\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":2,\"delta.columnMapping.physicalName\":\"letter\"}}]}","partitionSchemaJson":"{\"type\":\"struct\",\"fields\":[]}","sourceMetadataPath":"file:/tmp/checkpoint1/sources/0","tableConfigurations":{"delta.enableChangeDataFeed":"true","delta.columnMapping.mode":"name","delta.columnMapping.maxColumnId":"2"},"protocolJson":"{\"protocol\":{\"minReaderVersion\":2,\"minWriterVersion\":5}}"}

Despite this metadata present in the checkpoint location, you won't be able to restart the stream. But when you do, Delta Lake will give you a choice to continue by showing this error message:

Caused by: org.apache.spark.sql.delta.DeltaRuntimeException: [DELTA_STREAMING_CANNOT_CONTINUE_PROCESSING_POST_SCHEMA_EVOLUTION] We've detected one or more non-additive schema change(s) (RENAME COLUMN) between Delta version 1 and 2 in the Delta streaming source.
Please check if you want to manually propagate the schema change(s) to the sink table before we proceed with stream processing using the finalized schema at 2.
Once you have fixed the schema of the sink table or have decided there is no need to fix, you can set (one of) the following SQL configurations to unblock the non-additive schema change(s) and continue stream processing.
To unblock for this particular stream just for this series of schema change(s): set `spark.databricks.delta.streaming.allowSourceColumnRenameAndDrop.ckpt_-921269974 = 2`.
To unblock for this particular stream: set `spark.databricks.delta.streaming.allowSourceColumnRenameAndDrop.ckpt_-921269974 = always`
To unblock for all streams: set `spark.databricks.delta.streaming.allowSourceColumnRenameAndDrop = always`.
Alternatively if applicable, you may replace the `allowSourceColumnRenameAndDrop` with `allowSourceColumnRename` in the SQL conf to unblock stream for just this schema change type. 

Once you put one of the configurations, you'll be able to resume streaming:

val sparkSession = getDeltaLakeSparkSession(extraConfig = Map(
  "spark.databricks.delta.streaming.allowSourceColumnRenameAndDrop" -> "always"
))

Why not allow this by default? The problem is that the stream job will from now work on the renamed column (id_number) and the data sinks may still expect the old one (id). Independently on that, even if you do some custom transformation, you'll still need to adapt it to the evolved schema. But the schema tracking gives you at least a chance to replay the stream. A streaming query without the schema tracking will continuously fail with this error message:

Caused by: org.apache.spark.sql.delta.DeltaStreamingColumnMappingSchemaIncompatibleException: [DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_USE_SCHEMA_LOG] Streaming read is not supported on tables with read-incompatible schema changes (e.g. rename or drop or datatype changes).
Please provide a 'schemaTrackingLocation' to enable non-additive schema evolution for Delta stream processing.
See https://docs.delta.io/latest/versioning.html#column-mapping for more details.
Read schema: {"type":"struct","fields":[{"name":"number","type":"integer","nullable":true,"metadata":{"delta.columnMapping.id":1,"delta.columnMapping.physicalName":"number"}},{"name":"letter","type":"string","nullable":true,"metadata":{"delta.columnMapping.id":2,"delta.columnMapping.physicalName":"letter"}}]}. 
Incompatible data schema: {"type":"struct","fields":[{"name":"id_number","type":"integer","nullable":true,"metadata":{"delta.columnMapping.id":1,"delta.columnMapping.physicalName":"number"}},{"name":"letter","type":"string","nullable":true,"metadata":{"delta.columnMapping.id":2,"delta.columnMapping.physicalName":"letter"}}]}.

If your table supports column mapping, you can overcome this issue without the schema tracking by turning an streaming.unsafeReadOnIncompatibleColumnMappingSchemaChanges.enabled property on, which by the way is marked as "unsafe" not by mistake:

  val DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES =
	buildConf("streaming.unsafeReadOnIncompatibleColumnMappingSchemaChanges.enabled")
  .doc(
    "Streaming read on Delta table with column mapping schema operations " +
      "(e.g. rename or drop column) is currently blocked due to potential data loss and " +
    "schema confusion. However, existing users may use this flag to force unblock " +
      "if they'd like to take the risk.")
  .internal()
  .booleanConf
  .createWithDefault(false)

As you can see, streaming Delta tables, despite their structured nature, is not an easy task. Rare are the tables that don't evolve which makes the whole operation more tricky. Thankfully, the community has already provided the schema tracking capability that can make your life easier.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!