Arbitrary stateful processing has been evolving a lot in Apache Spark. The initial version with updateStateByKey evolved to mapWithState in Apache Spark 2. When Structured Streaming was released, the framework got mapGroupsWithState and flatMapGroupsWithState. Now, Apache Spark 4 introduces a completely new way to interact with the arbitrary stateful processing logic, the Arbitrary state API v2!
Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote
one on that topic! You can read it online
on the O'Reilly platform,
or get a print copy on Amazon.
I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩
Problems for *withState operators
The author of the new API, Anish Shrigondekar, identified several problems with the current *withState operators, and listed them in the initial SPIP:
- Limited typing. Accessing complex types such as lists always requires wrapping them with a custom state instance. Additionally, the *withState state class doesn't easily support multiple logical and independent instances of the same state group.
- State expiration flexibility. The SPIP introduces a concept of timers that are the state expiration controller. The *withState operators supports only one state expiration method based on the event or processing time. Besides, this state expiration method is not flexible, i.e. it cannot be paused or deleted.
- Lack of support for co-process function. The *withState API only accepts one iterator with the input rows making co-processing two streams impossible without having to wrap the streams in a kind of StreamAOrStreamBObjectInstance and writing a complex data handling logic in the mapping function.
- No state schema evolution support.
- Difficult state initialization.
- Lack of side outputs. Due to the single object returned by the *withState functions, it's not possible to implement side outputs to, for example, write erroneous records to a dedicated storage (cf. Dead-Letter pattern)
- Lack of stateful operations chaining. Apache Spark 3 didn't support defining other stateful operations after the arbitrary stateful functions.
That's all for the issues. Let's see now how they were addressed!
Classes and interactions
To get a better picture of the implementation of this new state API, we need to start by analyzing the added classes and the interactions between them. The next picture shows the main building blocks of the new arbitrary stateful processing:

This public transformWithState API hides a physical node with a similar name, the TransformWithStateExec. The node depends on three other components. The TimeMode describes the state expiration approach that follows the processing time, the event time (therefore, the watermark), or nothing (no state expiration). Next comes the OutputMode. If you have already worked with stateful processing before, this class won't surprise you as it represents the semantics of your stateful operation. For example, the append mode stands for the stateful operator emitting only final (expired state) results. But at this moment, the output mode is purely informational, i.e. the transformWithState doesn't enforce it anyhow and that's your responsibility to comply your business logic to the expected semantics.
Now the crucial part, your business logic. You define the logic as a class extending the StatefulProcessor interface, or StatefulProcessorWithInitialState. More exactly, you should focus your effort on:
- handleInputRows that processes new rows coming for the given grouping key
- handleExpiredTimer that triggers when one of the states associated with the grouping key expires
On the state side there is also some novelty. You can still define your custom state structures but you can also leverage composite types such as lists (ListState) and maps (MapState). Each of the state classes can be created with an optional TTL configuration that will perform the cleaning action after the configured period.
Example
This week I'll stop here with the internals exploration. Instead - I hope - to keep you hungry until next week, I'll share an example of the transformWithState. Two years ago I wrote an arbitrary stateful processing logic solving a sessionization pipeline to illustrate my Data+AI Summit talk. The code is still on Github, so I will move directly to the transformWithState translation. To understand it better, let's identify four main interactions with state in a stateful pipeline: state retrieval, state update, state expiration, and expiration time management. All them are depicted in the code below:

As you can see from the annotated code snippet, the main difference between the mapGroupsWithState and the transformWithState code is the separation of concerns. Previously you had to deal with the expired state in the same method that processes input rows. Depending on the iterator emptiness, you had to apply the logic to manage the expired state (empty iterator), or an active state. With the new API these actions are separated. If you need to do some special treatment on top of the expired state, you simply define your custom logic in the handleExpiredTimer handler.
Timers, by the way, are the new way to get your state to expire. Of course, there is still the watermark-based configuration (number 2 on the screenshot) to handle, but the underlying expiration mechanism is different. In the transformWithState API you can see there can be multiple timers (getHandle.listTimers()), while in the mapGroupsWithState API there is only one expiration logic for the given grouping key.
But timers are already one of the advanced concepts that I would like to show you next week. So, let's stop here and see you back then!
Consulting

With nearly 16 years of experience, including 8 as data engineer, I offer expert consulting to design and optimize scalable data solutions.
As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and
drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!
👉 contact@waitingforcode.com
đź”— past projects