_spark_metadata in Apache Spark Structured Streaming issue is no more!

Versions: Apache Spark 3.4.0 https://github.com/bartosz25/spark-playground/tree/master/structured-streaming-spark-metadata

There are probably not that many people working today on the flat files with Structured Streaming than 5 years ago thanks to the table file formats. However, if you are in this group and are still generating CSVs or JSONs with the streaming sink, brace yourself, the memory problems are coming if you don't take action!

Looking for a better data engineering position and skills?

You have been working as a data engineer but feel stuck? You don't have any new challenges and are still writing the same jobs all over again? You have now different options. You can try to look for a new job, now or later, or learn from the others! "Become a Better Data Engineer" initiative is one of these places where you can find online learning resources where the theory meets the practice. They will help you prepare maybe for the next job, or at least, improve your current skillset without looking for something else.

👉 I'm interested in improving my data engineering skillset

See you there, Bartosz

Below is a very simple code based on a micro-batch rate streaming source. I'm using this one to generate a lot of data quickly and show you why I'm focusing on the _spark_metadata in this blog post:

val inputStream = sparkSession.readStream.format("rate-micro-batch")
  // To better simulate the issue, let's define a lot of partitions
  .option("numPartitions", 15)
  .option("rowsPerBatch", 30)
  .option("advanceMillisPerBatch", 60000)
  .option("startTimestamp", startTimestamp.getTime)

val writeQuery = inputStream
  .option("checkpointLocation",  "/tmp/spark_metadata_issue/checkpoint")
  .option("path", "/tmp/wfc/spark_metadata_issue/data")

Look now what's inside the /tmp/wfc/spark_metadata_issue/data/_spark_metadata:

bartosz@bartosz:~$ ls -Gsh /tmp/wfc/spark_metadata_issue/data/_spark_metadata/ --sort size  | grep .compact
3.5M 1059.compact
3.4M 1049.compact
3.4M 1039.compact
3.4M 1029.compact
3.4M 1019.compact
3.3M 1009.compact
3.3M 999.compact
3.3M 989.compact
3.2M 979.compact
3.2M 969.compact
3.2M 959.compact
3.1M 949.compact
3.1M 939.compact
3.1M 929.compact

These compacted metadata files contain the files written in each micro-batch:

bartosz@bartosz:~$ head /tmp/wfc/spark_metadata_issue/data/_spark_metadata/909.compact

Unfortunately, this logic is broken and may lead to the OOM. But before showing you why it's broken, let's go back in time and understand the purpose of this _spark_metadata.


The feature was added 7 years ago by Michael Armbrust as part of SPARK-14078. The goal was to provide the index with committed files to avoid consumers reading data from uncompleted or failed micro-batches. Sounds familiar? Yes, you'll find the same principle for the table file formats metadata layer!

When it comes to our _spark_metadata. The class responsible for managing it is FileStreamSinkLog. Apache Spark creates it and assigns to an instance of ManifestFileCommitProtocol attached to the file sink in the micro-batch. The lifecycle of this commit protocol follows the stages of a micro-batch, i.e. it has a method to handle a successful task execution, another one to handle the micro-batch successful execution, or yet another to manage a micro-batch failure. This event-based listeners guarantee this exactly-once on the readers because:

This apparently inoffensive code can be a problem for a long-running application. The entries will be added with each job and compacted every x micro-batches. The problem is, the entries in this compaction file don't get cleared by the file sink process, at least not by default. As it's only growing, it'll lead to OOM at some point.


This OOM risk doesn't exist if you configure a retention attribute on your file sink:

val writeQuery = inputStream
  .option("retention", "50s")
  .option("checkpointLocation",  "/tmp/spark_metadata_retention/checkpoint")
  .option("path", "/tmp/wfc/spark_metadata_retention/data")

When you set the retention, the _spark_metadata log writer will compare the last modification time of each file against the retention configuration and discard it from the compacted entry:

class FileStreamSinkLog(metadataLogVersion: Int, sparkSession: SparkSession, path: String, _retentionMs: Option[Long] = None) {
// ...
override def shouldRetain(log: SinkFileStatus, currentTime: Long): Boolean = {
  if (retentionMs < Long.MaxValue) {
    if (currentTime - log.modificationTime > retentionMs) {
      logDebug(s"${log.path} excluded by retention - current time: $currentTime / " +
        s"modification time: ${log.modificationTime} / retention: $retentionMs ms.")
    } else {
  } else {

In the logs, you should see the messages like:

Writing atomically to /tmp/wfc/spark_metadata_retention/data/_spark_metadata/249.compact using temp file /tmp/wfc/spark_metadata_retention/data/_spark_metadata/.249.compact.526c50cc-1b2c-44c2-a5cf-acc28afc87d7.tmp (org.apache.spark.sql.execution.streaming.CheckpointFileManager:60)
file:///tmp/wfc/spark_metadata_retention/data/part-00000-925972c0-8f92-4aaf-8aaf-cf82b6eb4755-c000.json excluded by retention - current time: 1689221749842 / modification time: 1689221699688 / retention: 50000 ms. (org.apache.spark.sql.execution.streaming.FileStreamSinkLog:64)
file:///tmp/wfc/spark_metadata_retention/data/part-00001-a1678f29-a190-4298-ade0-baf7bf8126bb-c000.json excluded by retention - current time: 1689221749842 / modification time: 1689221699688 / retention: 50000 ms.(org.apache.spark.sql.execution.streaming.FileStreamSinkLog:64)
file:///tmp/wfc/spark_metadata_retention/data/part-00002-5ba40f74-ec63-41cf-b5c0-e971533d9130-c000.json excluded by retention - current time: 1689221749842 / modification time: 1689221699760 / retention: 50000 ms.(org.apache.spark.sql.execution.streaming.FileStreamSinkLog:64)

Renamed temp file /tmp/wfc/spark_metadata_retention/data/_spark_metadata/.249.compact.526c50cc-1b2c-44c2-a5cf-acc28afc87d7.tmp to /tmp/wfc/spark_metadata_retention/data/_spark_metadata/249.compact (org.apache.spark.sql.execution.streaming.CheckpointFileManager:60)
Compacting took 84 ms for compact batch 249 (org.apache.spark.sql.execution.streaming.FileStreamSinkLog:64)

This fix was added by Jungtaek Lim in SPARK-27188 released in the Apache Spark 3.1.0. But if you encounter the memory issues due to this _spark_metadata in prior versions, you can try to overcome them with foreachBatch sink and the batch sink API that doesn't generate this _spark_metadata layer.

If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!