File sink in Apache Spark Structured Streaming

Versions: Apache Spark 2.4.5

One of the homework tasks of my Become a Data Engineer course is about synchronizing streaming data with a file system storage. When I was trying to implement this part, I found a manifest-based file stream that I will explore in this and next blog posts.

In the post I will first focus on the usage of this sink, so check different options and the outcome. In the next 2 parts I will analyze how this writer works internally. I will deliberately omit one aspect of compression that I will cover in the next post.

File sink example

In simple terms, a file sink can be thought as an iterative execution of the batch file sink. It supports multiple formats (JSON, Avro, Parquet), but also the options like partitioning. Let's see that in the following example:

  "file sink" should "write files and the manifest" in {
    val sparkSession: SparkSession = SparkSession.builder()
      .appName("Spark Structured Streaming file sink example")
      .master("local[2]").getOrCreate()
    import sparkSession.implicits._

    val inputStream = new MemoryStream[(Int, String)](1, sparkSession.sqlContext)
    inputStream.addData((1, "abc"), (2, "def"), (3, "ghi"))

    val writeQuery = inputStream.toDS().toDF("nr", "letters").writeStream.trigger(Trigger.Once())
      .format("json")
      .partitionBy("nr")
      .option("path", outputPath)
      .option("checkpointLocation", "/tmp/file_sink_checkpoint")

    writeQuery.start().awaitTermination()

    import scala.collection.JavaConverters._
    val writtenFilesAndDirs = FileUtils.listFilesAndDirs(new File(outputPath), TrueFileFilter.TRUE,
      TrueFileFilter.TRUE).asScala.map(file => file.getAbsolutePath).toSeq
    writtenFilesAndDirs should contain allOf(
      "/tmp/files_sink_test/nr=1",
      "/tmp/files_sink_test/nr=2",
      "/tmp/files_sink_test/nr=3",
      "/tmp/files_sink_test/_spark_metadata/0")
  }

An important thing to notice is that the commit protocol used to generate the files is not the same as for batch. The most visible difference that you can see in the snippet above, is the existence of a manifest file created by the sink. Let's see some implementation details in the next section.

Implementation details

The class responsible for writing streaming data as files is org.apache.spark.sql.execution.streaming.FileStreamSink and the logic responsible for the writing is defined in its addBatch(batchId: Long, data: DataFrame) method, being by the way the method used by all available sinks. And this method starts by a very interesting thing:

    if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
      logInfo(s"Skipping already committed batch $batchId")

As you can see here, Apache Spark checks the last written batch number and if this number is smaller or equal to the currently executed batch, the whole writing process is skipped. That's why if you need to reprocess the data for file sink, removing offset and commit files from the checkpoint location won't be enough.

Later, the engine initializes the commit protocol that will be used for writes. It uses the class set in spark.sql.streaming.commitProtocolClass property which by default is ManifestFileCommitProtocol. After that, the commit protocol is initialized and the partition columns are resolved. At the end, FileFormatWriter's write method is invoked and all previously created parameters are passed to it:

      FileFormatWriter.write(
        sparkSession = sparkSession,
        plan = qe.executedPlan,
        fileFormat = fileFormat,
        committer = committer,
        outputSpec = FileFormatWriter.OutputSpec(path, Map.empty, qe.analyzed.output),
        hadoopConf = hadoopConf,
        partitionColumns = partitionColumns,
        bucketSpec = None,
        statsTrackers = Seq(basicWriteJobStatsTracker),
        options = options)
    }

As you can see, the bucketing is not implemented for the streaming writer.

ManifestFileCommitProtocol

That's the big picture but let's focus now on the ManifestFileCommitProtocol. This class is responsible for storing the list of all files written in a given micro-batch. The magic happens inside its commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]) method.

Every task commit in ManifestFileCommitProtocol creates a bunch of instances of SinkFileStatus:

case class SinkFileStatus(
    path: String,
    size: Long,
    isDir: Boolean,
    modificationTime: Long,
    blockReplication: Int,
    blockSize: Long,
    action: String

All SinkFileStatus instances are used by commitJob and written into _spark_metadata/${batch_number} file. Below you can find an example of a written entry:

v1
{"path":"file:///tmp/files_sink_test/part-00000-9376d97c-c96c-4e5a-869c-bd3e0668afdf-c000.json","size":48,"isDir":false,"modificationTime":1587205041000,"blockReplication":1,"blockSize":33554432,"action":"add"}

Thanks to this manifest file you can later apply a strategy that, in case of reprocessing, will delete already created files and manifests. And that will be the topic of my next post. But before it, I will write another one about the internals and cover the aspect of compression in the file sink.