What's new in Apache Spark 3.4.0 - Async progress tracking for Structured Streaming

Versions: Apache Spark 3.4.0

Finally, the time has come to start the analysis of the new features in Apache Spark. The first of them that grabbed my attention was the Async progress tracking from Structured Streaming.

Apache Spark micro-batch model is well known for not being the low-latency engine ([1]) and that's one of the reasons for proposing an alternative processing approach with the continuous processing. The difference with the classical micro-batch is the non-blocking character of each task. As a result, if one partition is slower to process, the others don't need to wait for it before taking new data. However, the continuous mode is still marked as an experimental feature and is not often used by the users who need lower latency than the ones provided in the micro-batch mode.

Optimized WAL commits

Boyang Jerry Peng who is the author of the feature described here, investigated the impact of the metadata operations on the micro-batch latency. He found out that committing offsets in the beginning and end of each micro-batch is an important overhead of ~320 to ~440 ms for the stateless pipelines. Still according to Boyang's analysis, optimizing this part can reduce the micro-batch latency by x10, from 337 ms to 31 ms which is huge!

But this gain doesn't come for free. I like to repeat this, there are trade-offs! Here we can get a better latency but on the other side, we take some semantics risks, such as:

AsyncProgressTrackingMicroBatchExecution

How is this shiny new feature implemented? If you analyze the source code, you'll notice a new condition in the StreamingQueryManager:

private def createQuery(
// ...
    	val microBatchExecution = if (useAsyncProgressTracking(extraOptions)) {
      	new AsyncProgressTrackingMicroBatchExecution(
        	sparkSession,
        	trigger,
        	triggerClock,
        	extraOptions,
        	analyzedStreamWritePlan)
    	} else {
      	new MicroBatchExecution(
        	sparkSession,
        	trigger,
        	triggerClock,
        	extraOptions,
        	analyzedStreamWritePlan)
    	}
    	new StreamingQueryWrapper(microBatchExecution)

As you can see, if the query has the asyncProgressTrackingEnabled flag enabled, Apache Spark will run the micro-batch with AsyncProgressTrackingMicroBatchExecution. However, it doesn't mean the asynchronous progress tracking will automatically be used. It won't if the processing logic meets one of the following criteria:

If the query is valid from the asynchronous progress tracking standpoint, this specific micro-batch execution interacts with 2 dedicated classes representing the offsets and commit logs, respectively the AsyncOffsetSeqLog and AsyncCommitLog.

What's the difference with the usual offsets and commit log classes? The main point is the usage of Future objects representing the checkpoint data materialization. To recall, a future represents an action that may complete in the future, but it can also fail.

Another point related to the Futures is their execution context. To run and not block the main thread, they need another thread. AsyncProgressTrackingMicroBatchExecution creates one here:

class AsyncProgressTrackingMicroBatchExecution(
// ...
  protected val asyncWritesExecutorService: ThreadPoolExecutor
  = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
	threadName="async-log-write",
	taskQueueCapacity=2, // one for offset commit and one for completion commit

Offsets and commits checkpointing

The checkpoint semantic relies on AsyncOffsetSeqLog and AsyncCommitLog. When a micro-batch starts, it gives an order to complete checkpoint generation for it to the AsyncOffsetSeqLog future. Once the offsets gets written, the callback triggers writing the corresponding commit files:

  override def markMicroBatchStart(): Unit = {
	// Because we are using a thread pool with only one thread, async writes to the offset log
	// are still written in a serial / in order fashion
	offsetLog
  	.addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, offsetSeqMetadata))
// ...
  	.thenAccept(tuple => {
      	if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) {
        	// sanity check to make sure batch ids are monotonically increasing
        	assert(lastBatchPersistedToDurableStorage.get < batchId)
        	val prevBatchOff = offsetLog.get(lastBatchPersistedToDurableStorage.get())
        	if (prevBatchOff.isDefined) {
          	// Offset is ready to be committed by the source. Add to queue
          	sourceCommitQueue.add(prevBatchOff.get)
        	} else {
// ...

Also, directly after submitting the offsets for checkpointing there is a chance to write pending commits:

override def markMicroBatchStart(): Unit = {
  // Because we are using a thread pool with only one thread, async writes to the offset log
  // are still written in a serial / in order fashion
  offsetLog
  // ...
  // check if there are offsets that are ready to be committed by the source
  var offset = sourceCommitQueue.poll()
  while (offset != null) {
    commitSources(offset)
    offset = sourceCommitQueue.poll()
}

When the micro-batch ends, the commits are only written to the checkpoint location if the corresponding offsets are:

  override def markMicroBatchEnd(): Unit = {
// ...
  	if (offsetLog.getAsyncOffsetWrite(currentBatchId).nonEmpty
    	|| isFirstBatch) {
    	isFirstBatch = false

    	commitLog
      	.addAsync(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))
// ...

Pretty confusing, isn't it? How could we end up with commits without offset files? Some of the micro-batches can be skipped in the checkpoint location. Besides the flag enabling the feature there is another one, the asyncProgressTrackingCheckpointIntervalMs. It defaults to 1 second and defines how frequently the job will checkpoint the offsets. It's declared at the AsyncProgressTrackingMicroBatchExecution level...

class AsyncProgressTrackingMicroBatchExecution(
// ...
  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
  = getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)

 override val offsetLog = new AsyncOffsetSeqLog(
    sparkSession,
    checkpointFile("offsets"),
    asyncWritesExecutorService,
    asyncProgressTrackingCheckpointingIntervalMs,
    clock = triggerClock
  )
// ...

object AsyncProgressTrackingMicroBatchExecution {
  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
	"asyncProgressTrackingCheckpointIntervalMs"


...and used in the AsyncOffsetSeqLog aforementioned addAsync method:

  def addAsync(batchId: Long, metadata: OffsetSeq): CompletableFuture[(Long, Boolean)] = {
// ...
val lastIssuedTs = lastCommitIssuedTimestampMs.get()
val future: CompletableFuture[(Long, Boolean)] = {
  if (offsetCommitIntervalMs > 0) {
    if ((lastIssuedTs == -1) // haven't started any commits yet
      || (lastIssuedTs + offsetCommitIntervalMs) <= clock.getTimeMillis()) {
      issueAsyncWrite(batchId).thenApply((batchId: Long) => {
        (batchId, true)
      })
    } else {
      // just return completed future because we are not persisting this offset
      CompletableFuture.completedFuture((batchId, false))
    }
  } else {
    // offset commit interval is not enabled
    issueAsyncWrite(batchId).thenApply((batchId: Long) => {
      (batchId, true)
    })
  }
}
// ...

As you can see in the highlighted snippet, if there is enough time elapsed from the previous offsets checkpointing, the log will trigger another materialization. If not, the given micro-batch will simply be skipped. Let me show you this in action.

Checkpoint interval in action

The demo code uses a rate-micro-batch source generating 5 rows each time. The sink is configured as follows:

   val consoleSink = rateMicroBatchSource
      .select($"timestamp", $"value", functions.spark_partition_id())
      .writeStream.format("console")
      .option("checkpointLocation", checkpointLocation)
      .option("asyncProgressTrackingEnabled", true)
      .option("asyncProgressTrackingCheckpointIntervalMs", TimeUnit.SECONDS.toMillis(5))

If you run the job for 1 minute, you should see something similar to me in the checkpoint location:

bartosz@bartosz:~$ ls /tmp/wfc/spark/3.4.0/async_progress_tracking/checkpoint/offsets/
0  130  20  216  311  421  541  61  658
bartosz@bartosz:~$ ls /tmp/wfc/spark/3.4.0/async_progress_tracking/checkpoint/commits/
0  130  20  216  311  421  541  61  658

As a result, while recovering from failure, the job will need to make more effort to find the previous fully committed execution. However, as you saw in the mentioned benchmark results, this little extra effort may be nothing bad compared to the performance gain!