At first glance, the update operation in an arbitrary stateful application looks just like another map's put function. However, it has an impact on what happens later with the state store. In this blog post, you will see an example that can eventually help you to reduce an I/O pressure of the updates.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
The blog post will explain the relationship between the update operation and the state store. I'm pretty sure that after reading the first part of it, you will immediately find the link. If not, there is a second part of the blog post that will shed some light on it!
State store and update
An arbitrary stateful processing application uses the custom stateful transformation defined in mapGroupsWithState or flatMapGroupsWithState. This custom function has the following signature:
// mapGroupsWithState func: (K, Iterator[V], GroupState[S]) => U) // flatMapGroupsWithState func: (K, Iterator[V], GroupState[S]) => Iterator[U]
As you can see, it takes 3 parameters. The first of them is the key of the state group, so the expression you used in the groupByKey call preceding the state-based mapping. The second parameter contains all input rows to process for this specific group from the given micro-batch. The last parameter represents the GroupState, so the object that you will use to interact with the state store. GroupState exposes all CRUD methods required to get, remove or update the custom state object (S type):
trait GroupState[S] extends LogicalGroupState[S] { // ... def get: S def getOption: Option[S] def update(newState: S): Unit def remove(): Unit
The state store object (S) is internally kept in the default GroupState implementation (GroupStateImpl). This default implementation also exposes the functions prefixed with has*. The physical execution node (FlatMapGroupsWithStateExec) uses them to find out whether the state changed in the processed micro-batch:
if (groupState.hasRemoved && groupState.getTimeoutTimestamp == NO_TIMESTAMP) { stateManager.removeState(store, stateData.keyRow) numUpdatedStateRows += 1 } else { val currentTimeoutTimestamp = groupState.getTimeoutTimestamp val hasTimeoutChanged = currentTimeoutTimestamp != stateData.timeoutTimestamp val shouldWriteState = groupState.hasUpdated || groupState.hasRemoved || hasTimeoutChanged if (shouldWriteState) { val updatedStateObj = if (groupState.exists) groupState.get else null stateManager.putState(store, stateData.keyRow, updatedStateObj, currentTimeoutTimestamp) numUpdatedStateRows += 1 } } }
As you can notice, whenever the state changes, the shouldWriteState flag is set to true and the FlatMapGroupsWithStateExec's InputProcessor passes the modified state to the state store manager. The manager, in its turn, calls StateStore's put method:
override def putState(store: StateStore, key: UnsafeRow, state: Any, timestamp: Long): Unit = { val stateRow = getStateRow(state) setTimestamp(stateRow, timestamp) store.put(key, stateRow) }
We're almost there. When you call the put method of the default StateStore implementation, you do 2 things. The first one is the update of the in-memory map with all state groups. The second one is the update of the checkpoint file:
override def put(key: UnsafeRow, value: UnsafeRow): Unit = { verify(state == UPDATING, "Cannot put after already committed or aborted") val keyCopy = key.copy() val valueCopy = value.copy() mapToUpdate.put(keyCopy, valueCopy) writeUpdateToDeltaFile(compressedStream, keyCopy, valueCopy) }
And that's the point I will develop in the next section!
Is the update really needed?
Exactly like in this favorite consultant reaction, the answer here is "it depends". If your mapping function always ends up with the modified state, then calling update every time makes sense. For example, when you do some data cumulation before generating the output at the state expiration.
On the other hand, if the ratio of modified states is low, calling update every time will add some unnecessary I/O that could make the state store checkpointing less efficient. Remember, if you don't call the update on the GroupState, the state store manager won't call the state store's put method. Hence, it won't write anything to the state store checkpoint file.
How to avoid calling the update? For a simple state, you can compare the mutable properties. Below you can find an example of a state storing the time of the first visit of the user. Unless receiving some late data, this state will change very rarely and to update it conditionally, we could use something like:
case class UserFirstVisit(userId: Int, firstEventTime: Long) def mapStreamingLogsToSessionDuration(timeoutDurationMs: Long)(key: Int, logs: Iterator[Row], currentState: GroupState[UserFirstVisit]): Option[UserFirstVisit] = { if (currentState.hasTimedOut) { // ... } else { val currentStateFromParams = currentState.getOption.getOrElse(UserFirstVisit(key, Long.MaxValue)) val newFirstVisit = logs.map(log => { log.getAs[Timestamp]("event_time").getTime }).min if (newFirstVisit < currentStateFromParams.firstEventTime) { currentState.update(currentStateFromParams.copy(firstEventTime = newFirstVisit)) }
Is it a better way, especially for a more complex mutable state entities? Initially, I thought of creating the new state every time with the help of case class' copy method and comparing these instances. After all - that's my thought a few minutes before writing this paragraph - if we copy the same object, we will not create a new instance. Wrong! A quick look at the copy's bytecode shows why:
javap -v classes/com/waitingforcode/UserFirstVisit.class public com.waitingforcode.UserFirstVisit copy(int, long); descriptor: (IJ)Lcom/waitingforcode/UserFirstVisit; flags: ACC_PUBLIC Code: stack=5, locals=4, args_size=3 0: new #2 // class com/waitingforcode/UserFirstVisit 3: dup 4: iload_1 5: lload_2 6: invokespecial #54 // Method "<init>":(IJ)V 9: areturn LineNumberTable: line 109: 0 LocalVariableTable: Start Length Slot Name Signature 0 10 0 this Lcom/waitingforcode/UserFirstVisit; 0 10 1 userId I 0 10 2 firstEventTime J MethodParameters: Name Flags userId final firstEventTime final
As you can see, it starts by creating the new object for the UserFirstVisit class, reading the copied parameters, and calling the constructor. There is no verification to know whether the object already exists or not. So comparing properties seems to be a single valid solution to not create new objects - even though it puts some work on the CPU side.
But remember to stay simple. Even though avoiding the update call can save some I/O, you shouldn't see any benefits if the state changes very often. Moreover, maybe the overhead of comparing the attributes either from the CPU or code readability perspective will have a worse effect than simply writing all states to the state store checkpoint location? Anyway, don't take it for granted but try to adapt to your situation.