File source and its internals

Versions: Apache Spark 3.0.0

Few months ago, before the Apache Spark 3.0 features series, you probably noticed a short series about files processing in Structured Streaming. If you enjoyed it, here is a complementary note presenting the file data source :)

In the first part of the post, you will see a very simple example of the data source in Structured Streaming. After that, you will discover some internal details of this data source.

File source examples

Let's start by writing the test case that I will use in the next section. As you can see, I'm creating here 5 files, every file with only 1 entry equal to the file name. After that, in the processing logic, I'm only selecting the value column and writing it to my local key-value data store mock. At the end of the snippet, I'm ensuring that all entries were read:

  "all files" should "be processed by file source" in {
    val dataDir = s"${streamDir}/data"
    new Thread(new Runnable {
      override def run(): Unit = {
        var nr = 1
        while (nr <= 5) {
          val fileName = s"${dataDir}/${nr}.txt"
          val jsonContent = nr.toString
          FileUtils.writeStringToFile(new File(fileName), jsonContent)
          nr += 1
          Thread.sleep(5000L)
        }
      }
    }).start()

    val sparkSession: SparkSession = SparkSession.builder()
      .appName("Spark Structured Streaming file source")
      .master("local[*]").getOrCreate()

    val inputData = sparkSession.readStream.textFile(dataDir)
      .select("value")

    val outputData = inputData.writeStream
        .option("checkpointLocation", "/tmp/file_source/checkpoint")
      .foreach(new ForeachWriter[Row] {
        override def open(partitionId: Long, epochId: Long): Boolean = true
        override def process(value: Row): Unit = {
          InMemoryKeyedStore.addValue(value.getAs[String]("value"), "ok")
        }
        override def close(errorOrNull: Throwable): Unit = {}
      })

    outputData.start().awaitTermination(60000)

    InMemoryKeyedStore.allValues should have size 5
    InMemoryKeyedStore.allValues.keys should contain allOf("1", "2", "3", "4", "5")
  }

As expected, all files were correctly read. So far, nothing complicated but the real magic starts from the next part where you can learn something more about the checkpoint location and its relationship to the file streaming source.

Processed files tracking

If you check your checkpoint location, you will notice that inside sources directory there are some new files created:

ls file_source/checkpoint/sources/0
0    1    2    3    4

Every created file stores the information about the files read in given micro-batch:

v1
{"path":"file:///tmp/file_source/data/2.txt","timestamp":1591072222000,"batchId":1}

As you can see, Apache Spark stores the information about the files processed in a given micro-batch. It's a very similar mechanism to the one used for file sink where a _spark_metadata directory was created and all files generated in a given micro-batch were written. If you remember at this moment, it was also a question about the retention period. Does it apply also to the source? How long Apache Spark will keep the metadata about the processed files?

And the answer is yes because spark.sql.streaming.minBatchesToRetain also applies to the files retained in the checkpoint location of the file source, but there is a small difference. Moreover, FileStreamSourceLog shares the same parent class as file sink representation ( FileStreamSinkLog), and therefore can use the same delete mechanism. A small difference is the configuration used to disable removal which for the sink is spark.sql.streaming.fileSink.log.deletion and for the source spark.sql.streaming.fileSource.log.deletion. By default, the deletes are turned on but you can disable them if you want:

// CompactibleFileStreamLog 
protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSourceLogDeletion

  override def add(batchId: Long, logs: Array[T]): Boolean = {
    val batchAdded =
      if (isCompactionBatch(batchId, compactInterval)) {
        compact(batchId, logs)
      } else {
        super.add(batchId, logs)
      }
    if (batchAdded && isDeletingExpiredLog) {
      deleteExpiredLog(batchId)
    }
    batchAdded
  }

Processing internals

My next interrogation point was about the resolution of the files to process. How does Apache Spark know that a file called with the same name as an already processed file should or shouldn't be taken into account? Internally, it keeps a list of already seen files in an in-memory map during the value defined in maxFileAge option. But, this option won't work when you set latestFirst to true and define maxFilesPerTrigger property on the source.

Another tricky part in the files tracking is the file name. The map tracking the files is built that way, with a property saying whether we track only the names or the full paths:

// FileStreamSource.scala
  /** A mapping from a file that we have processed to some timestamp it was last modified. */
  // Visible for testing and debugging in production.
  val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)

Later on, the check happens that way:

    def isNewFile(path: String, timestamp: Timestamp): Boolean = { 
      timestamp >= lastPurgeTimestamp && !map.containsKey(stripPathIfNecessary(path))
    }

From this we can deduce that recreating the fie with the same name won't work, ie. Spark will simply ignore it. Well, it's not completely true, keep reading to see why. As you can see in the snippet, another condition is put on the timestamp property. Every time a file is listed as a candidate for processing, Apache Spark retrieves the time of its last modification. If this change happened after this time:

lastPurgeTimestamp = latestTimestamp - maxAgeMs

It's considered as young enough to be integrated in the pipeline. The point is that when this lastPurgeTimestamp is computed, all files seen so far that fall behind this value, are removed from the map of already seen files:

      val iter = map.entrySet().iterator()
      var count = 0
      while (iter.hasNext) {
        val entry = iter.next()
        if (entry.getValue < lastPurgeTimestamp) {
          count += 1
          iter.remove()
        }
      }

And this method is called every time Apache Spark adds the new files to process. Let's see how it works in this simple test case where I'm first creating a file called 1.txt with an auto-incremented number inside:

  new Thread(new Runnable {
    override def run(): Unit = {
      val fileName = s"${dataDir}/1.txt"
      FileUtils.deleteQuietly(new File(fileName))
      var nr = 0
      while (true) {
        val jsonContent = nr.toString
        FileUtils.deleteQuietly(new File(fileName))
        FileUtils.writeStringToFile(new File(fileName), jsonContent)
        while (!(new File(s"/${checkpointDir}/sources/0/0").exists())) {
        }
        Thread.sleep(3000)
        nr += 1
      }
    }
  }).start()
  val sparkSession: SparkSession = SparkSession.builder()
    .appName("Spark Structured Streaming file source")
    .master("local[*]").getOrCreate()

  val inputData = sparkSession.readStream.option("maxFileAge", "5s").textFile(dataDir)
    .select("value")


  val outputData = inputData.writeStream.option("checkpointLocation", checkpointDir)
    .format("console")

In the following video you can see that the file 1.txt is not reprocessed as long as there is no other file (different name) to process. And it's normal since the old entries are removed from the in-memory tracking map only when there is something new to process. You can see that in the video:

Compaction

If you remember my blog post File sink and Out-Of-Memory risk, you certainly remember that there is a risk of an Out Of Memory error due to the compaction process in the file sink. Does the same risk exist in the file source? Let's check this!

Unfortunately, there is still a risk because the compaction logic is shared between the sink and the source:

// CompactibleFileStreamLog
// shared class by the sink and the source
  override def add(batchId: Long, logs: Array[T]): Boolean = {
    val batchAdded =
      if (isCompactionBatch(batchId, compactInterval)) {
        compact(batchId, logs)
      } else {
        super.add(batchId, logs)
      }
    if (batchAdded && isDeletingExpiredLog) {
      deleteExpiredLog(batchId)
    }
    batchAdded
  }

// FileStreamSourceLog
// source 
  override def add(batchId: Long, logs: Array[FileEntry]): Boolean = {
    if (super.add(batchId, logs)) {
      if (isCompactionBatch(batchId, compactInterval)) {
        fileEntryCache.put(batchId, logs)
      }
      true
    } else {
      false
    }
  }

What happens here? First, if the currently processed batch is the compaction one (you can find the formula in file sink and manifest compaction), Apache Spark will compact the files created from the last compaction stage. But every compaction stage takes all new files plus the previously compacted ones:

  /**
   * Returns all valid batches before the specified `compactionBatchId`. They contain all logs we
   * need to do a new compaction.
   *
   * E.g., if `compactInterval` is 3 and `compactionBatchId` is 5, this method should returns
   * `Seq(2, 3, 4)` (Note: it includes the previous compaction batch 2).
   */
  def getValidBatchesBeforeCompactionBatch(
      compactionBatchId: Long,
      compactInterval: Int): Seq[Long] = {
    assert(isCompactionBatch(compactionBatchId, compactInterval),
      s"$compactionBatchId is not a compaction batch")
    (math.max(0, compactionBatchId - compactInterval)) until compactionBatchId
  }
  private def compact(batchId: Long, logs: Array[T]): Boolean = {
    val (allLogs, loadElapsedMs) = Utils.timeTakenMs {
      val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
      validBatches.flatMap { id =>
        super.get(id).getOrElse {
          throw new IllegalStateException(
            s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId " +
              s"(compactInterval: $compactInterval)")
        }
      } ++ logs
    }
    val compactedLogs = compactLogs(allLogs)

  def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = {
    logs
  }

And obviously, a continuously growing list, in addition to the eager materialization of it, at some point will provoke memory issues. Let me prove you that in the following video:

Hopefully, this problem should be solved in Apache Spark 3.1.0 thanks to Structured Streaming _spark_metadata fills up Spark Driver memory when having lots of objects 🤞.

As you can see, file source is another example which uses the checkpoint location and in-memory map for faster access for the storage of some state. Another example I covered in this blog is the state store, I'm really curious if you know other places based on this principle 🤔

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!