https://github.com/bartosz25/spark-playground/tree/master/structured-streaming-spark-metadata
There are probably not that many people working today on the flat files with Structured Streaming than 5 years ago thanks to the table file formats. However, if you are in this group and are still generating CSVs or JSONs with the streaming sink, brace yourself, the memory problems are coming if you don't take action!
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
Below is a very simple code based on a micro-batch rate streaming source. I'm using this one to generate a lot of data quickly and show you why I'm focusing on the _spark_metadata in this blog post:
val inputStream = sparkSession.readStream.format("rate-micro-batch") // To better simulate the issue, let's define a lot of partitions .option("numPartitions", 15) .option("rowsPerBatch", 30) .option("advanceMillisPerBatch", 60000) .option("startTimestamp", startTimestamp.getTime) .load() val writeQuery = inputStream .writeStream .format("json") .option("checkpointLocation", "/tmp/spark_metadata_issue/checkpoint") .option("path", "/tmp/wfc/spark_metadata_issue/data") .start()
Look now what's inside the /tmp/wfc/spark_metadata_issue/data/_spark_metadata:
bartosz@bartosz:~$ ls -Gsh /tmp/wfc/spark_metadata_issue/data/_spark_metadata/ --sort size | grep .compact 3.5M 1059.compact 3.4M 1049.compact 3.4M 1039.compact 3.4M 1029.compact 3.4M 1019.compact 3.3M 1009.compact 3.3M 999.compact 3.3M 989.compact 3.2M 979.compact 3.2M 969.compact 3.2M 959.compact 3.1M 949.compact 3.1M 939.compact 3.1M 929.compact
These compacted metadata files contain the files written in each micro-batch:
bartosz@bartosz:~$ head /tmp/wfc/spark_metadata_issue/data/_spark_metadata/909.compact v1 {"path":"file:///tmp/wfc/spark_metadata_issue/data/part-00000-587972e3-d4f0-4e74-bd23-a4111f0d95a3-c000.json","size":103,"isDir":false,"modificationTime":1689217722670,"blockReplication":1,"blockSize":33554432,"action":"add"} {"path":"file:///tmp/wfc/spark_metadata_issue/data/part-00001-a54bf895-3ab1-4bdb-9b4a-3269cf2ff697-c000.json","size":103,"isDir":false,"modificationTime":1689217722670,"blockReplication":1,"blockSize":33554432,"action":"add"} {"path":"file:///tmp/wfc/spark_metadata_issue/data/part-00002-bdc1adb5-9228-4c6d-9b8f-6299447f7683-c000.json","size":103,"isDir":false,"modificationTime":1689217722758,"blockReplication":1,"blockSize":33554432,"action":"add"} {"path":"file:///tmp/wfc/spark_metadata_issue/data/part-00003-8ed08c99-1797-4009-bd5a-09c2886820a9-c000.json","size":103,"isDir":false,"modificationTime":1689217722762,"blockReplication":1,"blockSize":33554432,"action":"add"}
Unfortunately, this logic is broken and may lead to the OOM. But before showing you why it's broken, let's go back in time and understand the purpose of this _spark_metadata.
Exactly-once
The feature was added 7 years ago by Michael Armbrust as part of SPARK-14078. The goal was to provide the index with committed files to avoid consumers reading data from uncompleted or failed micro-batches. Sounds familiar? Yes, you'll find the same principle for the table file formats metadata layer!
When it comes to our _spark_metadata. The class responsible for managing it is FileStreamSinkLog. Apache Spark creates it and assigns to an instance of ManifestFileCommitProtocol attached to the file sink in the micro-batch. The lifecycle of this commit protocol follows the stages of a micro-batch, i.e. it has a method to handle a successful task execution, another one to handle the micro-batch successful execution, or yet another to manage a micro-batch failure. This event-based listeners guarantee this exactly-once on the readers because:
- commitTask(taskContext: TaskAttemptContext): when a task is committed, i.e. when it succeeded, the protocol adds all files the task had written to an in-memory map of files (var addedFiles: ArrayBuffer[String] = _. It's a local, executor-based operation.
- commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): when the job is committed, the driver gathers all addedFiles from the executors and adds them to the _spark_metadata:
override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { require(fileLog != null, "setupManifestOptions must be called before this function") val fileStatuses = taskCommits.flatMap(_.obj.asInstanceOf[Seq[SinkFileStatus]]).toArray // ... if (fileLog.add(batchId, fileStatuses)) { logInfo(s"Committed batch $batchId") } else { throw new IllegalStateException(s"Race while writing batch $batchId") }
This apparently inoffensive code can be a problem for a long-running application. The entries will be added with each job and compacted every x micro-batches. The problem is, the entries in this compaction file don't get cleared by the file sink process, at least not by default. As it's only growing, it'll lead to OOM at some point.
SPARK-27188
This OOM risk doesn't exist if you configure a retention attribute on your file sink:
val writeQuery = inputStream .writeStream .format("json") .option("retention", "50s") .option("checkpointLocation", "/tmp/spark_metadata_retention/checkpoint") .option("path", "/tmp/wfc/spark_metadata_retention/data") .start()
When you set the retention, the _spark_metadata log writer will compare the last modification time of each file against the retention configuration and discard it from the compacted entry:
class FileStreamSinkLog(metadataLogVersion: Int, sparkSession: SparkSession, path: String, _retentionMs: Option[Long] = None) { // ... override def shouldRetain(log: SinkFileStatus, currentTime: Long): Boolean = { if (retentionMs < Long.MaxValue) { if (currentTime - log.modificationTime > retentionMs) { logDebug(s"${log.path} excluded by retention - current time: $currentTime / " + s"modification time: ${log.modificationTime} / retention: $retentionMs ms.") false } else { true } } else { true } }
In the logs, you should see the messages like:
Writing atomically to /tmp/wfc/spark_metadata_retention/data/_spark_metadata/249.compact using temp file /tmp/wfc/spark_metadata_retention/data/_spark_metadata/.249.compact.526c50cc-1b2c-44c2-a5cf-acc28afc87d7.tmp (org.apache.spark.sql.execution.streaming.CheckpointFileManager:60) file:///tmp/wfc/spark_metadata_retention/data/part-00000-925972c0-8f92-4aaf-8aaf-cf82b6eb4755-c000.json excluded by retention - current time: 1689221749842 / modification time: 1689221699688 / retention: 50000 ms. (org.apache.spark.sql.execution.streaming.FileStreamSinkLog:64) file:///tmp/wfc/spark_metadata_retention/data/part-00001-a1678f29-a190-4298-ade0-baf7bf8126bb-c000.json excluded by retention - current time: 1689221749842 / modification time: 1689221699688 / retention: 50000 ms.(org.apache.spark.sql.execution.streaming.FileStreamSinkLog:64) file:///tmp/wfc/spark_metadata_retention/data/part-00002-5ba40f74-ec63-41cf-b5c0-e971533d9130-c000.json excluded by retention - current time: 1689221749842 / modification time: 1689221699760 / retention: 50000 ms.(org.apache.spark.sql.execution.streaming.FileStreamSinkLog:64) Renamed temp file /tmp/wfc/spark_metadata_retention/data/_spark_metadata/.249.compact.526c50cc-1b2c-44c2-a5cf-acc28afc87d7.tmp to /tmp/wfc/spark_metadata_retention/data/_spark_metadata/249.compact (org.apache.spark.sql.execution.streaming.CheckpointFileManager:60) Compacting took 84 ms for compact batch 249 (org.apache.spark.sql.execution.streaming.FileStreamSinkLog:64)
This fix was added by Jungtaek Lim in SPARK-27188 released in the Apache Spark 3.1.0. But if you encounter the memory issues due to this _spark_metadata in prior versions, you can try to overcome them with foreachBatch sink and the batch sink API that doesn't generate this _spark_metadata layer.