_spark_metadata in Apache Spark Structured Streaming issue is no more!

There are probably not that many people working today on the flat files with Structured Streaming than 5 years ago thanks to the table file formats. However, if you are in this group and are still generating CSVs or JSONs with the streaming sink, brace yourself, the memory problems are coming if you don't take action!

4-day workshop · In-person or online

What would it take for you to trust your Databricks pipelines in production?

A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.

Unit, data & integration tests
Medallion architecture & Lakeflow SDP
Max 10 participants · production-ready templates
See the full curriculum → €7,000 flat fee · cohort of up to 10
Bartosz Konieczny
Bartosz
Konieczny

Below is a very simple code based on a micro-batch rate streaming source. I'm using this one to generate a lot of data quickly and show you why I'm focusing on the _spark_metadata in this blog post:

val inputStream = sparkSession.readStream.format("rate-micro-batch")
  // To better simulate the issue, let's define a lot of partitions
  .option("numPartitions", 15)
  .option("rowsPerBatch", 30)
  .option("advanceMillisPerBatch", 60000)
  .option("startTimestamp", startTimestamp.getTime)
  .load()

val writeQuery = inputStream
  .writeStream
  .format("json")  	
  .option("checkpointLocation",  "/tmp/spark_metadata_issue/checkpoint")
  .option("path", "/tmp/wfc/spark_metadata_issue/data")
  .start()

Look now what's inside the /tmp/wfc/spark_metadata_issue/data/_spark_metadata:

bartosz@bartosz:~$ ls -Gsh /tmp/wfc/spark_metadata_issue/data/_spark_metadata/ --sort size  | grep .compact
3.5M 1059.compact
3.4M 1049.compact
3.4M 1039.compact
3.4M 1029.compact
3.4M 1019.compact
3.3M 1009.compact
3.3M 999.compact
3.3M 989.compact
3.2M 979.compact
3.2M 969.compact
3.2M 959.compact
3.1M 949.compact
3.1M 939.compact
3.1M 929.compact

These compacted metadata files contain the files written in each micro-batch:

bartosz@bartosz:~$ head /tmp/wfc/spark_metadata_issue/data/_spark_metadata/909.compact
v1
{"path":"file:///tmp/wfc/spark_metadata_issue/data/part-00000-587972e3-d4f0-4e74-bd23-a4111f0d95a3-c000.json","size":103,"isDir":false,"modificationTime":1689217722670,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/wfc/spark_metadata_issue/data/part-00001-a54bf895-3ab1-4bdb-9b4a-3269cf2ff697-c000.json","size":103,"isDir":false,"modificationTime":1689217722670,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/wfc/spark_metadata_issue/data/part-00002-bdc1adb5-9228-4c6d-9b8f-6299447f7683-c000.json","size":103,"isDir":false,"modificationTime":1689217722758,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/wfc/spark_metadata_issue/data/part-00003-8ed08c99-1797-4009-bd5a-09c2886820a9-c000.json","size":103,"isDir":false,"modificationTime":1689217722762,"blockReplication":1,"blockSize":33554432,"action":"add"}

Unfortunately, this logic is broken and may lead to the OOM. But before showing you why it's broken, let's go back in time and understand the purpose of this _spark_metadata.

Exactly-once

The feature was added 7 years ago by Michael Armbrust as part of SPARK-14078. The goal was to provide the index with committed files to avoid consumers reading data from uncompleted or failed micro-batches. Sounds familiar? Yes, you'll find the same principle for the table file formats metadata layer!

When it comes to our _spark_metadata. The class responsible for managing it is FileStreamSinkLog. Apache Spark creates it and assigns to an instance of ManifestFileCommitProtocol attached to the file sink in the micro-batch. The lifecycle of this commit protocol follows the stages of a micro-batch, i.e. it has a method to handle a successful task execution, another one to handle the micro-batch successful execution, or yet another to manage a micro-batch failure. This event-based listeners guarantee this exactly-once on the readers because:

This apparently inoffensive code can be a problem for a long-running application. The entries will be added with each job and compacted every x micro-batches. The problem is, the entries in this compaction file don't get cleared by the file sink process, at least not by default. As it's only growing, it'll lead to OOM at some point.

SPARK-27188

This OOM risk doesn't exist if you configure a retention attribute on your file sink:

val writeQuery = inputStream
  .writeStream
  .format("json")
  .option("retention", "50s")
  .option("checkpointLocation",  "/tmp/spark_metadata_retention/checkpoint")
  .option("path", "/tmp/wfc/spark_metadata_retention/data")
  .start()

When you set the retention, the _spark_metadata log writer will compare the last modification time of each file against the retention configuration and discard it from the compacted entry:

class FileStreamSinkLog(metadataLogVersion: Int, sparkSession: SparkSession, path: String, _retentionMs: Option[Long] = None) {
// ...
override def shouldRetain(log: SinkFileStatus, currentTime: Long): Boolean = {
  if (retentionMs < Long.MaxValue) {
    if (currentTime - log.modificationTime > retentionMs) {
      logDebug(s"${log.path} excluded by retention - current time: $currentTime / " +
        s"modification time: ${log.modificationTime} / retention: $retentionMs ms.")
      false
    } else {
      true
    }
  } else {
    true
  }
}

In the logs, you should see the messages like:

Writing atomically to /tmp/wfc/spark_metadata_retention/data/_spark_metadata/249.compact using temp file /tmp/wfc/spark_metadata_retention/data/_spark_metadata/.249.compact.526c50cc-1b2c-44c2-a5cf-acc28afc87d7.tmp (org.apache.spark.sql.execution.streaming.CheckpointFileManager:60)
file:///tmp/wfc/spark_metadata_retention/data/part-00000-925972c0-8f92-4aaf-8aaf-cf82b6eb4755-c000.json excluded by retention - current time: 1689221749842 / modification time: 1689221699688 / retention: 50000 ms. (org.apache.spark.sql.execution.streaming.FileStreamSinkLog:64)
file:///tmp/wfc/spark_metadata_retention/data/part-00001-a1678f29-a190-4298-ade0-baf7bf8126bb-c000.json excluded by retention - current time: 1689221749842 / modification time: 1689221699688 / retention: 50000 ms.(org.apache.spark.sql.execution.streaming.FileStreamSinkLog:64)
file:///tmp/wfc/spark_metadata_retention/data/part-00002-5ba40f74-ec63-41cf-b5c0-e971533d9130-c000.json excluded by retention - current time: 1689221749842 / modification time: 1689221699760 / retention: 50000 ms.(org.apache.spark.sql.execution.streaming.FileStreamSinkLog:64)

Renamed temp file /tmp/wfc/spark_metadata_retention/data/_spark_metadata/.249.compact.526c50cc-1b2c-44c2-a5cf-acc28afc87d7.tmp to /tmp/wfc/spark_metadata_retention/data/_spark_metadata/249.compact (org.apache.spark.sql.execution.streaming.CheckpointFileManager:60)
Compacting took 84 ms for compact batch 249 (org.apache.spark.sql.execution.streaming.FileStreamSinkLog:64)

This fix was added by Jungtaek Lim in SPARK-27188 released in the Apache Spark 3.1.0. But if you encounter the memory issues due to this _spark_metadata in prior versions, you can try to overcome them with foreachBatch sink and the batch sink API that doesn't generate this _spark_metadata layer.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems contact@waitingforcode.com đź“©