https://github.com/bartosz25/acid-file-formats/tree/main/005_compaction/delta_lake
The small files is a well known problem in data systems. Fortunately, modern table file formats have built-in features to address it. In the next series we'll see how.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
Rewriting
Delta Lake implements the files compaction process with 2 features. The first of them is explicit files rewriting:
sparkSession.read.format("delta").load(outputDir) .repartition(numberOfFiles) .write.option("dataChange", "false") .format("delta").mode("overwrite").save(outputDir)
It looks like any other writing process but as you can see in the highlighted part, the writer uses an option("dataChange", "false"). This option informs the downstream consumers that the writing operation only rearranges the data. Thanks to that notification, the consumers can ignore this new event in the transaction log.
Under-the-hood, this option is used by WriteIntoDelta#write method and implies the following:
- Semantic verification. A rewriting cannot change the table or partition schema nor create a new table.
- The dataChange attribute. The attribute of the AddFile and RemoveFile entries from the transaction log is set to false:
# ... {"add":{"path":"part-00000-fb2e75aa-b811-480d-8254-afa77a9eeaa3-c000.snappy.parquet","partitionValues":{},"size":825,"modificationTime":1672717157593,"dataChange":false, ...} {"remove":{"path":"part-00001-010a8686-b150-43d0-91bc-b17ad56b87b0-c000.snappy.parquet","deletionTimestamp":1672717158529,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{},"size":737}}
Besides these 2 aspects, the writing operation works normally, i.e. it identifies the files to overwrite, marks them as "removed" and writes new "added" files instead. It's worth noting that the workflow supports overwriting particular partitions:
val partitionToRewrite = "partition_number = 0" val numberOfFiles = 1 sparkSession.read .format("delta") .load(outputDir) .where(partitionToRewrite) .repartition(numberOfFiles) .write .option("dataChange", "false") .format("delta") .mode("overwrite") .option("replaceWhere", partitionToRewrite) .save(outputDir)
OPTIMIZE
If you don't want to use the API for compaction you can use SQL and the OPTIMIZE command:
OPTIMIZE my_table
The underlying implementation is similar to the rewriting one. It has a few subtleties, though:
- The threshold configuration. The OPTIMIZE command relies on the following configuration entries to mark a table or partition as "optimizable".The spark.databricks.delta.optimize.minFileSize is the minimal size of the files to optimize. The files smaller than this threshold will be grouped into bigger files:
class OptimizeExecutor // ... def optimize(): Seq[Row] = { recordDeltaOperation(txn.deltaLog, "delta.optimize") { val minFileSize = sparkSession.sessionState.conf.getConf( DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE) // ... val candidateFiles = txn.filterFiles(partitionPredicate) val partitionSchema = txn.metadata.partitionSchema // select all files in case of multi-dimensional clustering val filesToProcess = candidateFiles.filter(_.size < minFileSize || isMultiDimClustering) val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq
- Targeted size of the bigger files. The size of these optimized bigger files is defined in another property, the spark.databricks.delta.optimize.maxFileSize. The value is used in the groupFilesIntoBins method where for each partition, Delta Lake sorts the files by size in ascending order and packs them together with the respect of the maxFileSize:
private def groupFilesIntoBins(partitionsToCompact: Seq[(Map[String, String], Seq[AddFile])], maxTargetFileSize: Long): Seq[(Map[String, String], Seq[AddFile])] = { partitionsToCompact.flatMap { case (partition, files) => val bins = new ArrayBuffer[Seq[AddFile]]() val currentBin = new ArrayBuffer[AddFile]() var currentBinSize = 0L files.sortBy(_.size).foreach { file => if (file.size + currentBinSize > maxTargetFileSize && !isMultiDimClustering) { bins += currentBin.toVector currentBin.clear() currentBin += file currentBinSize = file.size } else { // ... } } if (currentBin.nonEmpty) { bins += currentBin.toVector } // ...
- Optimization job creation. The result of the groupFilesIntoBins method is later converted into optimize bin jobs:
class OptimizeExecutor // ... def optimize(): Seq[Row] = { // ... val jobs = groupFilesIntoBins(partitionsToCompact, maxFileSize) val maxThreads = sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS) val updates = ThreadUtils.parmap(jobs, "OptimizeJob", maxThreads) { partitionBinGroup => runOptimizeBinJob(txn, partitionBinGroup._1, partitionBinGroup._2, maxFileSize) }.flatten
If the number of spark.databricks.delta.optimize.maxThreads is greater than 1, the jobs can run in parallel but within the same transaction. As a result, any partial success won't be visible to the readers.
The optimize job relies on Apache Spark's coalesce or repartition. The behavior is defined in the spark.databricks.delta.optimize.repartition.enabled property (defaults to false). - In the end, the operation verifies whether the compacted files are still part of the table (they may have been removed meantime by a much faster delete data transaction). If it's the case, it generates the corresponding stats:
val optimizeStats = OptimizeStats() optimizeStats.addedFilesSizeStats.merge(addedFiles) optimizeStats.removedFilesSizeStats.merge(removedFiles) optimizeStats.numPartitionsOptimized = jobs.map(j => j._1).distinct.size optimizeStats.numBatches = jobs.size optimizeStats.totalConsideredFiles = candidateFiles.size optimizeStats.totalFilesSkipped = optimizeStats.totalConsideredFiles - removedFiles.size optimizeStats.totalClusterParallelism = sparkSession.sparkContext.defaultParallelism
The list above doesn't include points for Z-Ordered tables compaction. Due to the logic of this storage layer, the OPTIMIZE command behaves differently in bins creation and stats generation but it's a topic for another blog post.
Rewriting and OPTIMIZE are 2 ways to compact smaller files into bigger ones in Delta Lake and improve the reading I/O. Even though they have subtle differences, they both mark the rewritten files as rearranged-only with the dataChange flag set to false. Good news, we don't stop here for the compaction and next week we're going to see it with Apache Iceberg!