ACID file formats - writing: Delta Lake

Versions: Delta Lake 1.2.1

It's time for the last data generation part of the ACID file formats series. This time we'll see how Delta Lake writes new files.

Operations and options

Among the DML operations, Delta Lake supports:

The writing process might look easy but despite this apparency, Delta Lake has some specific writing configuration entries:

Additionally to these writing parameters, you will find writing configuration entries in the DeltaSQLConfBase:

Data skipping

The first intriguing feature is data skipping. As you already know from the previous section, Delta Lake collects several statistics about the generated files. And they have an important role for the in-place operations. To understand why let's see what happens if they're missing and we want to delete the rows matching a predicate:

An optimized version uses data skipping to avoid reading the data files not matching the delete condition. How? When the spark.databricks.delta.stats.collect flag is enabled, Delta Lake writers generate additional statistics for each data file by using a new DeltaJobStatisticsTracker instance. The tracker is a factory class exposing 2 methods:

class DeltaJobStatisticsTracker // ...

var recordedStats: Map[String, String] = _

override def newTaskInstance(): WriteTaskStatsTracker = {
  val rootPath = new Path(rootUri)
  val hadoopConf = srlHadoopConf.value
  new DeltaTaskStatisticsTracker(dataCols, statsColExpr, rootPath, hadoopConf)
}

override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = {
  recordedStats = stats.map(_.asInstanceOf[DeltaFileStatistics]).flatMap(_.stats).toMap
}

The newTaskInstance method gets invoked when data writers put the input rows to the files. The returned DeltaTaskStatisticsTracker instance exposes 2 important stats tracking methods:

class DeltaTaskStatisticsTracker
 // ...
override def newFile(newFilePath: String): Unit
override def newRow(filePath: String, currentRow: InternalRow): Unit 

The newFile creates a stats tracking buffer for the file generated in the writing task and passed as parameter. Later on, whenever a row is written to this file, the data writer calls the newRow function that loads the stats buffer and updates the statistic values for all the columns.

Once the file gets all the writes, the writer task calls the DeltaTaskStatisticsTracker#closeFile(filePath: String) function that computes the final statistics and puts them to the results = new collection.mutable.HashMap[String, String] map as a JSON. Once all tasks terminate, the transaction adds the statistics to the commit log.

Now comes the reading. The overall flow uses an extra step summarized in the updated reading schema:

Let's see this feature in action for a query replacing the files matching a predicate from the replaceWith action. When it happens, Delta Lake performs a DeleteCommand that calls the OptimisticTransactionImpl#filterFiles(filters: Seq[Expression]) under-the-hood. This filterFiles method uses DataSkippingReader#filesForScan method to get the files matching the predicate filter and put them to the reading index. While the execution of this delete step, the task knows the files to rewrite and can do this without processing the whole table. Since this part concerns the reading, I'll focus on it more next time.

String statistics

Additionally to the classic statistics for numerical values, Delta Lake also supports strings. The file format stores a string prefix in the data skipping index. The prefix length definition depends on the spark.databricks.io.skipping.stringPrefixLength configuration entry.

For example, the following dataset with stringPrefixLength set to 2 will store the max value only up to 2 characters:

val inputData = (0 to 30).map(nr => (nr, nr*2, (0 to nr).mkString("")))
      .toDF("id", "multiplication_result", "concat_numbers")

And the stats:

+-------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|path                                                               |stats                                                                                                                                                                                                                          |
+-------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|part-00000-3dfc3c2d-fc13-4b64-92ce-c65486ea3a56-c000.snappy.parquet|{"numRecords":31,"minValues":{"id":0,"multiplication_result":0,"concat_numbers":"0"},"maxValues":{"id":30,"multiplication_result":60,"concat_numbers":"01�"},"nullCount":{"id":0,"multiplication_result":0,"concat_numbers":0}}|
+-------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

On the other hand, increasing the prefix length to 20 will keep fine-grained statistics for the single text field of the input:

+-------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|path                                                               |stats                                                                                                                                                                                                                                            |
+-------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|part-00000-fa3c14d4-3180-40cf-aabc-8b49c0ef616a-c000.snappy.parquet|{"numRecords":31,"minValues":{"id":0,"multiplication_result":0,"concat_numbers":"0"},"maxValues":{"id":30,"multiplication_result":60,"concat_numbers":"01234567891011121314�"},"nullCount":{"id":0,"multiplication_result":0,"concat_numbers":0}}|
+-------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Partition delete

Delta Lake also supports partition-based operations. It means that in case of a delete or update involving the whole partition, Delta Lake doesn't need to open each file and find the matching rows - even with the statistics enabled! Instead, it simply takes all the files from the partition and marks them as deleted in the commit log.

The key moment of this operation is the classification of the predicates as the data and metadata conditions. The method involved is:

object DeltaTableUtils extends PredicateHelper with DeltaLogging {

def isPredicateMetadataOnly(
    condition: Expression,
    partitionColumns: Seq[String],
    spark: SparkSession): Boolean = {
  isPredicatePartitionColumnsOnly(condition, partitionColumns, spark) &&
    !containsSubquery(condition)
} 

Whenever the filters contain only the partitioning column, it's classified as a metadata-only predicate. The DeltaLog#filterFileList employed in the data skipping, can use this filter to extract the files modified by the partition operation only.

On the other hand, the operation with a data filter does modify the data and doesn't use data skipping. The physical plan for this scenario looks like that:

== Physical Plan ==
*(1) Filter UDF()
+- *(1) Filter NOT ((id#636 > 3) <=> true)
   +- *(1) ColumnarToRow
      +- FileScan parquet [id#636,multiplication_result#637,is_even#638] Batched: true, DataFilters: [NOT ((id#636 > 3) <=> true)], Format: Parquet, Location: TahoeBatchFileIndex(1 paths)[file:/tmp/acid-file-formats/002_writing/delta_lake], PartitionFilters: [], PushedFilters: [], ReadSchema: struct

In the end, the DataFrame is materialized to Parquet files, the new files are added to the commit log and the old ones marked as deleted.

dataChange

The last discussed option, the dataChange option looks inoffensive at first glance. Just a flag to mark an operation as changing the underlying data. However, semantically speaking, it's not as easy.

Let's take an example of the DELETE operation. As you saw in the previous section, it does change the data, so logically the dataChange flag should be set to true. But from the streaming standpoint, this change only involves deleting something already processed. What should be the value of dataChange?

Tathagata Das sheds some light on a possible solution in Should the AddFiles from DELETE have dataChange=true? issue. He recommends using Change Data Feed to catch only the changed rows without compromising the dataChange purpose.

And here we are! The writing part of the 3 ACID file formats ends with this Delta Lake presentation. Next time you'll learn about reading, and this time, I will start by the reverse lexicographical order, so by Delta Lake!


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!