Arbitrary stateful processing: update and put dependency

Versions: Apache Spark 3.1.1

At first glance, the update operation in an arbitrary stateful application looks just like another map's put function. However, it has an impact on what happens later with the state store. In this blog post, you will see an example that can eventually help you to reduce an I/O pressure of the updates.

The blog post will explain the relationship between the update operation and the state store. I'm pretty sure that after reading the first part of it, you will immediately find the link. If not, there is a second part of the blog post that will shed some light on it!

State store and update

An arbitrary stateful processing application uses the custom stateful transformation defined in mapGroupsWithState or flatMapGroupsWithState. This custom function has the following signature:

// mapGroupsWithState
func: (K, Iterator[V], GroupState[S]) => U)

// flatMapGroupsWithState
func: (K, Iterator[V], GroupState[S]) => Iterator[U]

As you can see, it takes 3 parameters. The first of them is the key of the state group, so the expression you used in the groupByKey call preceding the state-based mapping. The second parameter contains all input rows to process for this specific group from the given micro-batch. The last parameter represents the GroupState, so the object that you will use to interact with the state store. GroupState exposes all CRUD methods required to get, remove or update the custom state object (S type):

trait GroupState[S] extends LogicalGroupState[S] {
// ...
  def get: S
  def getOption: Option[S]

  def update(newState: S): Unit

  def remove(): Unit

The state store object (S) is internally kept in the default GroupState implementation (GroupStateImpl). This default implementation also exposes the functions prefixed with has*. The physical execution node (FlatMapGroupsWithStateExec) uses them to find out whether the state changed in the processed micro-batch:

        if (groupState.hasRemoved && groupState.getTimeoutTimestamp == NO_TIMESTAMP) {
          stateManager.removeState(store, stateData.keyRow)
          numUpdatedStateRows += 1
        } else {
          val currentTimeoutTimestamp = groupState.getTimeoutTimestamp
          val hasTimeoutChanged = currentTimeoutTimestamp != stateData.timeoutTimestamp
          val shouldWriteState = groupState.hasUpdated || groupState.hasRemoved || hasTimeoutChanged
          if (shouldWriteState) {
            val updatedStateObj = if (groupState.exists) groupState.get else null
            stateManager.putState(store, stateData.keyRow, updatedStateObj, currentTimeoutTimestamp)
            numUpdatedStateRows += 1
          }
        }
      }

As you can notice, whenever the state changes, the shouldWriteState flag is set to true and the FlatMapGroupsWithStateExec's InputProcessor passes the modified state to the state store manager. The manager, in its turn, calls StateStore's put method:

    override def putState(store: StateStore, key: UnsafeRow, state: Any, timestamp: Long): Unit = {
      val stateRow = getStateRow(state)
      setTimestamp(stateRow, timestamp)
      store.put(key, stateRow)
    }

We're almost there. When you call the put method of the default StateStore implementation, you do 2 things. The first one is the update of the in-memory map with all state groups. The second one is the update of the checkpoint file:

    override def put(key: UnsafeRow, value: UnsafeRow): Unit = {
      verify(state == UPDATING, "Cannot put after already committed or aborted")
      val keyCopy = key.copy()
      val valueCopy = value.copy()
      mapToUpdate.put(keyCopy, valueCopy)
      writeUpdateToDeltaFile(compressedStream, keyCopy, valueCopy)
    }

And that's the point I will develop in the next section!

Is the update really needed?

Exactly like in this favorite consultant reaction, the answer here is "it depends". If your mapping function always ends up with the modified state, then calling update every time makes sense. For example, when you do some data cumulation before generating the output at the state expiration.

On the other hand, if the ratio of modified states is low, calling update every time will add some unnecessary I/O that could make the state store checkpointing less efficient. Remember, if you don't call the update on the GroupState, the state store manager won't call the state store's put method. Hence, it won't write anything to the state store checkpoint file.

How to avoid calling the update? For a simple state, you can compare the mutable properties. Below you can find an example of a state storing the time of the first visit of the user. Unless receiving some late data, this state will change very rarely and to update it conditionally, we could use something like:

case class UserFirstVisit(userId: Int, firstEventTime: Long)
  def mapStreamingLogsToSessionDuration(timeoutDurationMs: Long)(key: Int, logs: Iterator[Row],
                                                                 currentState: GroupState[UserFirstVisit]): Option[UserFirstVisit] = {
    if (currentState.hasTimedOut) {
// ...
    } else {
      val currentStateFromParams = currentState.getOption.getOrElse(UserFirstVisit(key, Long.MaxValue))
      val newFirstVisit = logs.map(log => {
        log.getAs[Timestamp]("event_time").getTime
      }).min
      if (newFirstVisit < currentStateFromParams.firstEventTime) {
        currentState.update(currentStateFromParams.copy(firstEventTime = newFirstVisit))
      }

Is it a better way, especially for a more complex mutable state entities? Initially, I thought of creating the new state every time with the help of case class' copy method and comparing these instances. After all - that's my thought a few minutes before writing this paragraph - if we copy the same object, we will not create a new instance. Wrong! A quick look at the copy's bytecode shows why:

javap -v  classes/com/waitingforcode/UserFirstVisit.class

  public com.waitingforcode.UserFirstVisit copy(int, long);
    descriptor: (IJ)Lcom/waitingforcode/UserFirstVisit;
    flags: ACC_PUBLIC
    Code:
      stack=5, locals=4, args_size=3
         0: new           #2                  // class com/waitingforcode/UserFirstVisit
         3: dup
         4: iload_1
         5: lload_2
         6: invokespecial #54                 // Method "<init>":(IJ)V
         9: areturn
      LineNumberTable:
        line 109: 0
      LocalVariableTable:
        Start  Length  Slot  Name   Signature
            0      10     0  this   Lcom/waitingforcode/UserFirstVisit;
            0      10     1 userId   I
            0      10     2 firstEventTime   J
    MethodParameters:
      Name                           Flags
      userId                         final
      firstEventTime                 final

As you can see, it starts by creating the new object for the UserFirstVisit class, reading the copied parameters, and calling the constructor. There is no verification to know whether the object already exists or not. So comparing properties seems to be a single valid solution to not create new objects - even though it puts some work on the CPU side.

But remember to stay simple. Even though avoiding the update call can save some I/O, you shouldn't see any benefits if the state changes very often. Moreover, maybe the overhead of comparing the attributes either from the CPU or code readability perspective will have a worse effect than simply writing all states to the state store checkpoint location? Anyway, don't take it for granted but try to adapt to your situation.

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!