File sink and manifest compaction

Versions: Apache Spark 2.4.5

In my previous post I introduced the file sink in Apache Spark Structured Streaming. Today it's time to focus on an important concept of this output format which is the manifest file lifecycle.

In the first part of this post I will focus on the abstract class shared by the source and sink in Structured Streaming. In the next parts, I will explain different points related to the compaction.

CompactibleFileStreamLog

CompactibleFileStreamLog is the class responsible for the storage of files metadata in Apache Spark Structured Streaming. It's an abstract class and its 2 public implementations are FileStreamSource and FileStreamSinkLog. You can then deduce that both, file source and file sink in Structured Streaming will use a kind of "manifest" file to keep track of the read or written files in given micro-batch.

However, the content of these 2 "manifests" won't be the same. The sink will store serialized SinkFileStatus whereas the source the serialized FIleEntry:

case class SinkFileStatus(path: String, size: Long, isDir: Boolean, modificationTime: Long, blockReplication: Int, blockSize: Long, action: String)
case class FileEntry(path: String, timestamp: Timestamp, batchId: Long) extends Serializable

The content of "manifests", stored inside ${checkpointLocation}/sources/0/${batchNumber} contains the entries like:

v1
{"path":"file:///tmp/files_sink_manifest/input/1587218148238.json","timestamp":1587218148000,"batchId":0}
{"path":"file:///tmp/files_sink_manifest/input/1587218148249.json","timestamp":1587218148000,"batchId":0}
{"path":"file:///tmp/files_sink_manifest/input/1587218148246.json","timestamp":1587218148000,"batchId":0}
{"path":"file:///tmp/files_sink_manifest/input/1587218150249.json","timestamp":1587218150000,"batchId":0}
{"path":"file:///tmp/files_sink_manifest/input/1587218152249.json","timestamp":1587218152000,"batchId":0} 

The content for write "manifest", stored inside ${outputPath}/_spark_metadata/${batchNumber}, looks like:

v1
{"path":"file:///tmp/files_sink_manifest/output/part-00000-9840becd-4704-4632-8ec5-26396f05994f-c000.json","size":78,"isDir":false,"modificationTime":1587218155000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/files_sink_manifest/output/part-00001-2632b234-d298-4303-830b-31c0703b6c6e-c000.json","size":52,"isDir":false,"modificationTime":1587218155000,"blockReplication":1,"blockSize":33554432,"action":"add"}

Small files problem

One of the first things you find when analyzing the code source of CompactibleFileStreamLog is this comment:

 * As reading from many small files is usually pretty slow, also too many
 * small files in one folder will mess the FS, [[CompactibleFileStreamLog]] will
 * compact log files every 10 batches by default into a big file. When
 * doing a compaction, it will read all old log files and merge them with the new batch.

As you can see, we retrieve here the similar technique than in the state store management consisting of flattening multiple files into a single one. If you are curious about the state store's version, I explained it in Delta and snapshot state store formats article.

Regarding the implementation for the files part, the compaction is performed for both source and sinks but there are some subtle differences. First, the source. Internally it's represented by FileStreamSourceLog class and the compaction happens every spark.sql.streaming.fileSource.log.compactInterval log files (default to 10). The triggering of compaction happens in add method adding new files to the "manifest":

  override def add(batchId: Long, logs: Array[FileEntry]): Boolean = {
    if (super.add(batchId, logs)) {
      if (isCompactionBatch(batchId, compactInterval)) {
        fileEntryCache.put(batchId, logs)
      }
      true
    } else {
      false
    }
  }

Where the isCompactionBatch is a simple modulo division of the batch number by the batch interval:

  def isCompactionBatch(batchId: Long, compactInterval: Int): Boolean = {
    (batchId + 1) % compactInterval == 0
  }

Regarding the sink "manifest", it's represented by FileStreamSinkLog class. Its compaction frequency is controlled by spark.sql.streaming.fileSink.log.compactInterval property (default to 10) and it uses the same isCompactionBatch method to detect the operation requiring the compaction.

The result of compaction for both, source and sink cases, are the files called with the number of batch and compact extension (9.compact, 19.compact, ...).

The content of the compacted files look like in the following snippet, for the source:

v1
{"path":"file:///tmp/files_sink_manifest/input/1587220536861.json","timestamp":1587220536000,"batchId":0}
{"path":"file:///tmp/files_sink_manifest/input/1587220536868.json","timestamp":1587220536000,"batchId":0}
{"path":"file:///tmp/files_sink_manifest/input/1587220536853.json","timestamp":1587220536000,"batchId":0}
{"path":"file:///tmp/files_sink_manifest/input/1587220538869.json","timestamp":1587220538000,"batchId":0}
{"path":"file:///tmp/files_sink_manifest/input/1587220540869.json","timestamp":1587220540000,"batchId":0}
{"path":"file:///tmp/files_sink_manifest/input/1587220542869.json","timestamp":1587220542000,"batchId":1}
{"path":"file:///tmp/files_sink_manifest/input/1587220544870.json","timestamp":1587220544000,"batchId":2}
{"path":"file:///tmp/files_sink_manifest/input/1587220546870.json","timestamp":1587220546000,"batchId":3}
{"path":"file:///tmp/files_sink_manifest/input/1587220548871.json","timestamp":1587220548000,"batchId":4}
{"path":"file:///tmp/files_sink_manifest/input/1587220550871.json","timestamp":1587220550000,"batchId":5}
{"path":"file:///tmp/files_sink_manifest/input/1587220552871.json","timestamp":1587220552000,"batchId":6}
{"path":"file:///tmp/files_sink_manifest/input/1587220554871.json","timestamp":1587220554000,"batchId":7}
{"path":"file:///tmp/files_sink_manifest/input/1587220556872.json","timestamp":1587220556000,"batchId":8}
{"path":"file:///tmp/files_sink_manifest/input/1587220558872.json","timestamp":1587220558000,"batchId":9}

And the sink:

v1
{"path":"file:///tmp/files_sink_manifest/output/part-00000-3346647b-f488-4860-bc44-41a57e42802c-c000.json","size":78,"isDir":false,"modificationTime":1587220544000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/files_sink_manifest/output/part-00001-4475d324-4546-4597-beca-5a58235c78de-c000.json","size":52,"isDir":false,"modificationTime":1587220544000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/files_sink_manifest/output/part-00000-53fe200a-c426-4353-a53b-1d320f20e9e4-c000.json","size":26,"isDir":false,"modificationTime":1587220545000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/files_sink_manifest/output/part-00000-03b3dd32-8dad-481a-a7f8-80d42daa83af-c000.json","size":26,"isDir":false,"modificationTime":1587220545000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/files_sink_manifest/output/part-00000-807839f7-d111-466a-b9d0-81905c2022a6-c000.json","size":26,"isDir":false,"modificationTime":1587220547000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/files_sink_manifest/output/part-00000-2848cdcb-5f9a-445f-bef9-e917bab22dd9-c000.json","size":26,"isDir":false,"modificationTime":1587220549000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/files_sink_manifest/output/part-00000-df434fc2-fba3-4676-ab7c-e6a88a8b1d30-c000.json","size":26,"isDir":false,"modificationTime":1587220551000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/files_sink_manifest/output/part-00000-d23d614e-48d1-41fc-9e91-c3f9182962e5-c000.json","size":26,"isDir":false,"modificationTime":1587220553000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/files_sink_manifest/output/part-00000-03f6d6ea-0dff-4b73-aa45-dbf55467928d-c000.json","size":26,"isDir":false,"modificationTime":1587220555000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/files_sink_manifest/output/part-00000-5437bc21-e1cf-4d58-8478-285f135f802e-c000.json","size":26,"isDir":false,"modificationTime":1587220557000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/files_sink_manifest/output/part-00000-8b676783-c175-4c4d-9d57-7c3fe841f3a0-c000.json","size":26,"isDir":false,"modificationTime":1587220559000,"blockReplication":1,"blockSize":33554432,"action":"add"}

As you can see, the source contains batch id but it's not related to the compaction process. It's because of the FileEntry serialized inside the "manifest" files, so automatically copied to the compacted versions. SinkFileStatus used in the sink doesn't have the batch number information and therefore it's not in the compacted files.

Let's see now what happens with the compacted batch files.

Cleanup policy

How Apache Spark knows how many batch files it should keep? To figure the things out, it uses the spark.sql.streaming.minBatchesToRetain property - the same property as for figuring out the number of checkpoints and state store versions to keep. You can learn about it in Checkpoint storage in Structured Streaming or State store 101 posts.

Turning off the delete

Even though the deletes are enabled by default, you can disable them with spark.sql.streaming.fileSink.log.deletion property. In that situation, the logs still will be compacted but they won't be removed after the expiration time.

Please notice that the cleanup policy doesn't concern the written entries but only the files. Unfortunately, at least up to Apache Spark 3.0, all written entries are retained in the compacted file and it can lead to Out Of Memory problems described in File sink and Out-Of-Memory risk

For manifest files, the past versions are deleted in deleteExpiredLog(currentBatchId: Long) method and few more configuration properties are involved, like the cleanup delay represented by spark.sql.streaming.fileSource.log.cleanupDelay property for the source and spark.sql.streaming.fileSink.log.cleanupDelay for the sink. In addition to that, the clean up behavior can be disabled with these properties, spark.sql.streaming.fileSource.log.deletion for the source and spark.sql.streaming.fileSink.log.deletion for the sink.

Let's check now how it works together. deleteExpiredLog method first checks if there are some too old batches to remove. Let's take an example with the minBatchesToRetain equal to 5 and the compaction interval set to 10, and apply it to these lines:

    if (compactInterval <= currentBatchId + 1 - minBatchesToRetain) {
      // Find the first compaction batch id that maintains minBatchesToRetain
      val minBatchId = currentBatchId + 1 - minBatchesToRetain
      val minCompactionBatchId = minBatchId - (minBatchId % compactInterval) - 1

So, for our case we can suppose that we're dealing with the batch number 21, which gives:

if (10 < 21 + 1 - 5) {
  val minBatchId = 21 + 1 - 5 = 17
  val minCompactionBatchId = 17 - (17 % 10) - 1 = 17 - 7 - 1 = 9

Later, the method retrieves all not compacted files created before the batch number 9, and deletes them physically:

val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs
      fileManager.list(metadataPath, new PathFilter {
        override def accept(path: Path): Boolean = {
          try {
            val batchId = getBatchIdFromFileName(path.getName)
            batchId < minCompactionBatchId
          } catch {
            case _: NumberFormatException =>
              false
          }
        }
      }).foreach { f =>
        if (f.getModificationTime <= expiredTime) {
          fileManager.delete(f.getPath)
        }
      }

As you can see, we retrieve here our clean up delay parameter. It's used to address the eventual consistency issue of S3 that may hide the compaction file created meantime. Why compaction files? Simply because the getBatchIdFromFileName also takes compaction files into account:

  val COMPACT_FILE_SUFFIX = ".compact"
  def getBatchIdFromFileName(fileName: String): Long = {
    fileName.stripSuffix(COMPACT_FILE_SUFFIX).toLong
  }

This stripSuffix method works on compaction files and also on the raw files which don't have any extension, so that's the place where both types are removed.

Initially, instead of this post I wanted to present a use case of this manifest files in a data processing pipeline. However, when I saw that they can be compacted, I knew that I couldn't do that. The compaction has a big impact on the reprocessing and I will try to show you that in the next blog post.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!