Finally, the time has come to start the analysis of the new features in Apache Spark. The first of them that grabbed my attention was the Async progress tracking from Structured Streaming.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
Apache Spark micro-batch model is well known for not being the low-latency engine ([1]) and that's one of the reasons for proposing an alternative processing approach with the continuous processing. The difference with the classical micro-batch is the non-blocking character of each task. As a result, if one partition is slower to process, the others don't need to wait for it before taking new data. However, the continuous mode is still marked as an experimental feature and is not often used by the users who need lower latency than the ones provided in the micro-batch mode.
Optimized WAL commits
Boyang Jerry Peng who is the author of the feature described here, investigated the impact of the metadata operations on the micro-batch latency. He found out that committing offsets in the beginning and end of each micro-batch is an important overhead of ~320 to ~440 ms for the stateless pipelines. Still according to Boyang's analysis, optimizing this part can reduce the micro-batch latency by x10, from 337 ms to 31 ms which is huge!
But this gain doesn't come for free. I like to repeat this, there are trade-offs! Here we can get a better latency but on the other side, we take some semantics risks, such as:
- Exactly-once end-to-end guarantee for the sinks supporting it can be broken. Why? Typically, the semantic is guaranteed by the batch id. If the given id is recognized by the sink as fully written but a failure happens before fully committing the offsets in the asynchronous mode, restarting the task won't generate any output.
- Even worse! If the job restarts, it can detect new end offsets because some data might have been produced between the failure and restart times. As you can imagine, due to the exactly-once semantic relying on the batch id, this new data will simply be ignored [lost].
- Besides the impact on the sink, the asynchronous tracking also impacts the failure recovery. Committing one task offsets can be slower than for the others and as a result, the volume of data processed twice can increase in case of a failure recovery. Additionally, since recovery can load different data, it breaks the assumption that for a micro-batch N we should find the N-1 and N+1 in the commits. Due to the related recent changes on the watermark semantics (SPARK-40925), the asynchronous progress tracking will not work for the stateful pipelines.
AsyncProgressTrackingMicroBatchExecution
How is this shiny new feature implemented? If you analyze the source code, you'll notice a new condition in the StreamingQueryManager:
private def createQuery( // ... val microBatchExecution = if (useAsyncProgressTracking(extraOptions)) { new AsyncProgressTrackingMicroBatchExecution( sparkSession, trigger, triggerClock, extraOptions, analyzedStreamWritePlan) } else { new MicroBatchExecution( sparkSession, trigger, triggerClock, extraOptions, analyzedStreamWritePlan) } new StreamingQueryWrapper(microBatchExecution)
As you can see, if the query has the asyncProgressTrackingEnabled flag enabled, Apache Spark will run the micro-batch with AsyncProgressTrackingMicroBatchExecution. However, it doesn't mean the asynchronous progress tracking will automatically be used. It won't if the processing logic meets one of the following criteria:
- the query is a stateful query
- once or available now triggers the query execution
- the sink is not a no-op, console, memory, or Kafka writing operation
If the query is valid from the asynchronous progress tracking standpoint, this specific micro-batch execution interacts with 2 dedicated classes representing the offsets and commit logs, respectively the AsyncOffsetSeqLog and AsyncCommitLog.
What's the difference with the usual offsets and commit log classes? The main point is the usage of Future objects representing the checkpoint data materialization. To recall, a future represents an action that may complete in the future, but it can also fail.
Another point related to the Futures is their execution context. To run and not block the main thread, they need another thread. AsyncProgressTrackingMicroBatchExecution creates one here:
class AsyncProgressTrackingMicroBatchExecution( // ... protected val asyncWritesExecutorService: ThreadPoolExecutor = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler( threadName="async-log-write", taskQueueCapacity=2, // one for offset commit and one for completion commit
Offsets and commits checkpointing
The checkpoint semantic relies on AsyncOffsetSeqLog and AsyncCommitLog. When a micro-batch starts, it gives an order to complete checkpoint generation for it to the AsyncOffsetSeqLog future. Once the offsets gets written, the callback triggers writing the corresponding commit files:
override def markMicroBatchStart(): Unit = { // Because we are using a thread pool with only one thread, async writes to the offset log // are still written in a serial / in order fashion offsetLog .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)) // ... .thenAccept(tuple => { if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) { // sanity check to make sure batch ids are monotonically increasing assert(lastBatchPersistedToDurableStorage.get < batchId) val prevBatchOff = offsetLog.get(lastBatchPersistedToDurableStorage.get()) if (prevBatchOff.isDefined) { // Offset is ready to be committed by the source. Add to queue sourceCommitQueue.add(prevBatchOff.get) } else { // ...
Also, directly after submitting the offsets for checkpointing there is a chance to write pending commits:
override def markMicroBatchStart(): Unit = { // Because we are using a thread pool with only one thread, async writes to the offset log // are still written in a serial / in order fashion offsetLog // ... // check if there are offsets that are ready to be committed by the source var offset = sourceCommitQueue.poll() while (offset != null) { commitSources(offset) offset = sourceCommitQueue.poll() }
When the micro-batch ends, the commits are only written to the checkpoint location if the corresponding offsets are:
override def markMicroBatchEnd(): Unit = { // ... if (offsetLog.getAsyncOffsetWrite(currentBatchId).nonEmpty || isFirstBatch) { isFirstBatch = false commitLog .addAsync(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)) // ...
Pretty confusing, isn't it? How could we end up with commits without offset files? Some of the micro-batches can be skipped in the checkpoint location. Besides the flag enabling the feature there is another one, the asyncProgressTrackingCheckpointIntervalMs. It defaults to 1 second and defines how frequently the job will checkpoint the offsets. It's declared at the AsyncProgressTrackingMicroBatchExecution level...
class AsyncProgressTrackingMicroBatchExecution( // ... protected val asyncProgressTrackingCheckpointingIntervalMs: Long = getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions) override val offsetLog = new AsyncOffsetSeqLog( sparkSession, checkpointFile("offsets"), asyncWritesExecutorService, asyncProgressTrackingCheckpointingIntervalMs, clock = triggerClock ) // ... object AsyncProgressTrackingMicroBatchExecution { val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled" val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS = "asyncProgressTrackingCheckpointIntervalMs"
...and used in the AsyncOffsetSeqLog aforementioned addAsync method:
def addAsync(batchId: Long, metadata: OffsetSeq): CompletableFuture[(Long, Boolean)] = { // ... val lastIssuedTs = lastCommitIssuedTimestampMs.get() val future: CompletableFuture[(Long, Boolean)] = { if (offsetCommitIntervalMs > 0) { if ((lastIssuedTs == -1) // haven't started any commits yet || (lastIssuedTs + offsetCommitIntervalMs) <= clock.getTimeMillis()) { issueAsyncWrite(batchId).thenApply((batchId: Long) => { (batchId, true) }) } else { // just return completed future because we are not persisting this offset CompletableFuture.completedFuture((batchId, false)) } } else { // offset commit interval is not enabled issueAsyncWrite(batchId).thenApply((batchId: Long) => { (batchId, true) }) } } // ...
As you can see in the highlighted snippet, if there is enough time elapsed from the previous offsets checkpointing, the log will trigger another materialization. If not, the given micro-batch will simply be skipped. Let me show you this in action.
Checkpoint interval in action
The demo code uses a rate-micro-batch source generating 5 rows each time. The sink is configured as follows:
val consoleSink = rateMicroBatchSource .select($"timestamp", $"value", functions.spark_partition_id()) .writeStream.format("console") .option("checkpointLocation", checkpointLocation) .option("asyncProgressTrackingEnabled", true) .option("asyncProgressTrackingCheckpointIntervalMs", TimeUnit.SECONDS.toMillis(5))
If you run the job for 1 minute, you should see something similar to me in the checkpoint location:
bartosz@bartosz:~$ ls /tmp/wfc/spark/3.4.0/async_progress_tracking/checkpoint/offsets/ 0 130 20 216 311 421 541 61 658 bartosz@bartosz:~$ ls /tmp/wfc/spark/3.4.0/async_progress_tracking/checkpoint/commits/ 0 130 20 216 311 421 541 61 658
As a result, while recovering from failure, the job will need to make more effort to find the previous fully committed execution. However, as you saw in the mentioned benchmark results, this little extra effort may be nothing bad compared to the performance gain!