File sink and Out-Of-Memory risk

Versions: Apache Spark 3.0.0

A few weeks ago I wrote 3 posts about file sink in Structured Streaming. At this time I wasn't aware of one potential issue, namely an Out-Of-Memory problem that at some point will happen.

In the first part of the blog post, I will show you the snippets and explain how this OOM can happen. In the second section, I will try to provide some workarounds, staying with Structured Streaming. Finally, I will describe the community ongoing effort that tends to overcome the issue.

OOM?

Yes, I couldn't believe either when I saw the issues on the mailing list. Initially, I was thinking that user's problem was related to some hidden, memory-intensive business logic but only after analyzing the file sink metadata management calmly, I understood that indeed, it hides the Out-Of-Memory risk.

The OOM risk comes from the compaction mechanism of the metadata. To recall, for every micro-batch, Apache Spark Structured Streaming file sink writes the data files and a metadata file in _spark_metadata directory. Every x (configurable), a metadata file is written. The framework takes then the content of these files and puts it into a special kind of file called a compaction file. The raw files are later removed after reaching another threshold, configured by spark.sql.streaming.minBatchesToRetain. The methods responsible for that are these ones:

private def deleteExpiredLog(currentBatchId: Long): Unit = {
// ...
     fileManager.list(metadataPath, (path: Path) => {
        try {
          val batchId = getBatchIdFromFileName(path.getName)
          batchId < minCompactionBatchId
        } catch {
          case _: NumberFormatException =>
            false
        }
      }).foreach { f =>
        if (f.getModificationTime <= expiredTime) {
          fileManager.delete(f.getPath)
        }
      }
// ...

  /**
   * Returns all valid batches before the specified `compactionBatchId`. They contain all logs we
   * need to do a new compaction.
   *
   * E.g., if `compactInterval` is 3 and `compactionBatchId` is 5, this method should returns
   * `Seq(2, 3, 4)` (Note: it includes the previous compaction batch 2).
   */
  def getValidBatchesBeforeCompactionBatch(
      compactionBatchId: Long,
      compactInterval: Int): Seq[Long] = {
    assert(isCompactionBatch(compactionBatchId, compactInterval),
      s"$compactionBatchId is not a compaction batch")
    (math.max(0, compactionBatchId - compactInterval)) until compactionBatchId
  }

Two things. First, the delete method keeps the oldest compacted file, so that the method retrieving the files to compact can read the content of it and include it in the new compacted file. That way the content of the compacted file is continuously growing, until reaching the point where all these lines are too big to be kept in the driver's memory. And it obviously can lead to OOM problems. I illustrated it, without producing the OOM though, in the following video:

Workarounds

The main purpose of _spark_metadata is to ensure the exactly-once guarantee, ie. even when you replay given micro-batch and doesn't change the sink metadata, Apache Spark will skip the writing. Hence, providing the workaround is not so obvious. That's why, let's start with the situation when you don't expect the exactly-once delivery. For that case, you can use the foreachBatch method and batch writer that doesn't generate the metadata files:

  val query = stream.writeStream
    .foreachBatch((dataset: DataFrame, batchId: Long) => {
      dataset.write.mode(SaveMode.Append).json("/tmp/spark/file-sink/workaround-1/")
    })
    .start()

ACID-compliant file formats

I didn't try it but one of possible solutions to achieve exactly once at the moment of writing could one of the new ACID-compliant (if you have a better name, please comment, I'm still looking for this one :P) formats like: Apache Hudi, Apache Iceberg or DetaLake. On the other hand, I'm wondering if they could help to avoid issues when you have to reprocess the data and remove some of already created files, which is possible with the file sink raw metadata files (compacted metadata loses the batch number information). Once again, feel free to suggest the solution if you have any 🙏

On the other hand, keeping the exactly-once will be more challenging but it can also be solved with foreachBatch. The idea is to simulate the metadata file and control its size by putting X batch entries in every file:

  val query = stream.writeStream.option("checkpointLocation", s"${baseDir}/checkpoint${System.currentTimeMillis()}")
    .foreachBatch((dataset: DataFrame, batchId: Long) => {
      writeData(dataset, batchId)
    })
    .start()

  def writeData(dataset: Dataset[Row], batchId: Long): Unit = {
    val stagingDir = s"${baseDir}/${batchId}"
    dataset.write.mode(SaveMode.Overwrite).json(stagingDir)
 
    val metadataDir = s"${baseDir}/metadata"
    val outputDir = s"${baseDir}/output"
    new ExactlyOnceWriter(batchId, metadataDir, outputDir, stagingDir)
      .cleanPreviousBatchExecution
      .promoteStagingFiles
  }
class ExactlyOnceWriter(batchId: Long, metadataDir: String, outputDir: String, stagingDir: String) {

  type StagingFileWithNewName = (File, String)

  lazy val stagingFiles = FileUtils.listFiles(new File(stagingDir), Array("json"), true)

  lazy val stagingFilesWithNewNames: Iterable[StagingFileWithNewName] = stagingFiles.asScala.zipWithIndex
    .map(stagingFileWithIndex => {
      val (stagingFile, index) = stagingFileWithIndex
      (stagingFile, s"${outputDir}/batch-${batchId}-${index}.json")
    })

  private val metadataFileNumber = {
    val entriesPerFile = 100
    batchId / 100
  }
  private val metadataFile = new File(s"${metadataDir}/${metadataFileNumber}.meta")
  lazy val metadataFileEntity = {
    if (metadataFile.exists()) {
      ExactlyOnceWriter.JacksonMapper.readValue(metadataFile, classOf[MetadataFile])
    } else {
      MetadataFile()
    }
  }

  def cleanPreviousBatchExecution: this.type = {
    metadataFileEntity.files.get(batchId.toString).map(oldFiles => {
      oldFiles.foreach(oldFile => new File(oldFile).delete())
    })
    this
  }

  def promoteStagingFiles: this.type = {
    stagingFilesWithNewNames.foreach {
      case (stagingFile, newName) => {
        FileUtils.moveFile(stagingFile, new File(newName))
      }
    }
    val newMetadataFiles = metadataFileEntity.files + (batchId.toString -> stagingFilesWithNewNames.map(_._2))
    println(newMetadataFiles)
    FileUtils.writeStringToFile(metadataFile,
      ExactlyOnceWriter.JacksonMapper.writeValueAsString(metadataFileEntity.copy(files = newMetadataFiles))
    )
    this
  }

}

Apart from these 2, I found other proposed workarounds, like the one coming from SPARK-24295 where an extra Spark job compacts the metadata files. Another solution from this task is to stop the streaming job every 6 hours and reduce the size of the checkpointed files (offsets and _spark_metadata). But you will see in the next section that there is an ongoing community effort to overcome the OOM risk. And if you are in similar trouble, do not hesitate to follow the JIRAs, PRs and share your insights!

Community effort

The efforts started more than one year ago, in February with the proposal to retain only the info for the last batch in the metadata but it would break the exactly-once semantic of the sink.

Another proposal consisted on introducing V2 of the _spark_metadata format, LZ4-compressed and using UnsafeRow, exactly as the state store storage (SPARK-30946). Finally, the idea of using UnsafeRow was not assessed as the best one because of a potential compatibility risk between Spark versions ("One lesson I learned from the past is UnsafeRow is not designed to be persisted across Spark versions. I'd avoid to introduce another place that persists data in the UnsafeRow format" by Shixiong Zhu (SPARK-30946 PR's comment).

But the PR is still open and after a few iterations, a proposal using DataInputStream / DataOutputStream to serialize/deserialize Java's object entries seems to be retained (as of 29.06.2020). If it's implemented in one of the next releases, the compaction work should perform approximately ~10 times faster than the current one. And it seems to be the first step to do to finally get rid of this OOM issue of the file sink _spark_metadata file:

Even with this patch & retention, memory issue may still exist, as this patch doesn't help reducing memory usage on compact. Current file stream source/sink requires to materialize all entries into memory during compact, maybe also during get. The problem is more specific to file stream sink, as the size of entry are much bigger, and even with retention a compact batch would have bunch of entries. Addressing issue on get is hard (because it's also coupled with callers' logic), but addressing issue on compact would be relatively easier, and helps file stream sink to avoid OOME during compact phase. Next item to do.

The "Next item to do" is being reviewed in [SPARK-30462][SS] Streamline the logic on file stream source and sink to avoid memory PR. The idea is to use a read-and-process instead of read-materialize-and-process approach, so to compact every entry as soon as it's read instead of putting them to the main memory first.

Apache Spark 3.1.0

The memory issue can be fixed for the pipelines using Apache Spark 3.1.0 or later. In this version Jungtaek Lim added a retention configuration to filter out outdated entries in the compacting process.

As you can see then, the issue is still alive and you will probably have to do some manual work to handle it. Hopefully, the community effort should soon solve it and make you use the file sink simply, once again 🤞