https://github.com/bartosz25/spark-...main/scala/com/waitingforcode/batch
To close the topic of the new arbitrary stateful processing API in Apache Spark Structured Streaming let's focus on its...batch counterpart!
Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote
one on that topic! You can read it online
on the O'Reilly platform,
or get a print copy on Amazon.
I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩
I know, it may sound surprising that the API added primarily for Structured Streaming can be used in batch pipelines, but that's how it is! For that reason, it's good to see some potential use cases of this batch-based transformWithState:
- Feature parity between the batch and streaming APIs. It was an argument often mentioned for adding the batch version for the transformWithState.
- Data backfilling where you could use the batch version to process records from your data at-rest location. The drawback here is the potential lack of state synchronization, so you should only use it when the state doesn't need to be refreshed. It could be solved if Apache Spark Structured Streaming supports state writer in the future as it has been supporting state reader from the most recent release.
- An API that natively splits the processing code into an initialization and data processing part.
Differences
The crucial difference between the streaming and batch versions is the isStreaming flag present on the TransformWithStateExec physical operator. It's set to true by default but overwritten to false when Apache Spark plans the query with transformWithState for the batch API:

Besides this flag that conditions the internal behavior, the batch operator has other specificities:
- Uses shuffle partitions from SparkSession every time, while the streaming operator uses the shuffle partitions from SparkSession only the first time, when the checkpoint is not available.
- Ignores all watermarks passed to the physical node. Since the query runs only once, the watermark information is not available.
However, the biggest difference comes from this isStreaming flag that conditions the execution inside the physical operator. This conditioning means:
- Initialization of a new but temporary state store location for each shuffle partition.
- Aborting updates made to the state store during the processing. Consequently, all state changes made inside the batch won't be available for the subsequent executions:
// TransformWithStateExec#processDataWithPartition commitTimeMs += timeTakenMs { if (isStreaming) { store.commit() } else { store.abort() } }
Despite these differences, the batch version shares a few features with its streaming counterpart, such as timers processing and state initialization.
Incremental processing
Even though the batch version doesn't support state persistence, there is a way to simulate it by saving the state to dedicate storage location. It follows the principles of the Incremental Sessionizer pattern I explained in Chapter 5 of my Data Engineering Design Patterns book. Let me adapt it to the incremental processing with the batch version of the transformWithState:

In case you didn't get the point, the idea of the incremental processing with the batched transformWithState consist of:
- A batch job generating two outputs: one for the consumers and one for the next batch execution. The consumers' output is the result of the data transformation in the code while the batch's output is the state updated inside the transformWithState. Consequently, the job using this transformation has to generate two columns in the handleInputRows function.
- The transformWithState uses the previously saved state as the initial state parameter. I explained this more in depth last week in What's new in Apache Spark 4.0.0 - Arbitrary state API v2 - internals.
Demo
I recorded a short demo to show a batch code for the transformWithState:
With this demo I'm closing the new arbitrary stateful API part in Apache Spark 4 series. Now, it's time to move on and discover other new features from the most recent release!
Consulting

With nearly 16 years of experience, including 8 as data engineer, I offer expert consulting to design and optimize scalable data solutions.
As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and
drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!
👉 contact@waitingforcode.com
đź”— past projects