https://github.com/bartosz25/spark-...streaming-stateful-schema-evolution
Schema evolution is a widespread topic in data engineering. You'll face it in batch whenever you need to modify an output table. You'll face it in streaming whenever you need to change the structure of the records written to your Apache Kafka topic. Ultimately, you'll also face it in stateful processing whenever you need to change the schema of your state. This last aspect will be the topic of our blog post.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I wrote
one on that topic! You can read it online
on the O'Reilly platform,
or get a print copy on Amazon.
I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩
Arbitrary stateful processing in Apache Spark Structured Streaming supports two state formats, UnsafeRow (default) and Apache Avro. In a nutshell, the UnsafeRow format is faster but only Apache Avro supports schema evolution.
Problematic UnsafeRow
To better understand what happens when you change the UnsafeRow-backed state, let's take this example:
case class LetterStateV1(lowerLetters: Seq[String])
class LetterProcessorV1 extends StatefulProcessor[Int, Letter, String] {
@transient private var letterStateValue: ValueState[LetterStateV1] = _
// ...
override def handleInputRows(key: Int, inputRows: Iterator[Letter],
timerValues: TimerValues): Iterator[String] = {
val restoredState: Option[LetterStateV1] = Option(letterStateValue.get())
println(s"Restored state was ${restoredState}")
val newLowerLetters = inputRows.map(letter => letter.lowerCase).toSeq
val newState = restoredState.map(stateV1 => {
stateV1.copy(lowerLetters = stateV1.lowerLetters ++ newLowerLetters)
}).getOrElse(LetterStateV1(lowerLetters = newLowerLetters))
letterStateValue.update(newState)
Iterator(s"${key} = ${letterStateValue.get().toString}")
}
}
The job runs without any apparent issues, but imagine a scenario where your state schema needs to change. For instance, you want to add an additional timestamp column to capture the current processing time. Depending on how you implement this, you could potentially break state compatibility. Let's take a look at the modified code first:
case class LetterStateV2(processingTime: Long, lowerLetters: Seq[String])
class LetterProcessorV2 extends StatefulProcessor[Int, Letter, String] {
// ...
override def handleInputRows(key: Int, inputRows: Iterator[Letter],
timerValues: TimerValues): Iterator[String] = {
// ...
val processingTime = timerValues.getCurrentProcessingTimeInMs()
val newState = restoredState.map(stateV1 => {
stateV1.copy(processingTime = processingTime, lowerLetters = stateV1.lowerLetters ++ newLowerLetters)
}).getOrElse(LetterStateV2(processingTime = processingTime, lowerLetters = newLowerLetters))
letterStateValue.update(newState)
// ...
Once you execute the modified code, you'll notice some unexpected results:
Restored state was Some(LetterStateV2(68719476768,List())) Restored state was Some(LetterStateV2(68719476768,List())) ------------------------------------------- Batch: 1 ------------------------------------------- +----------------------------------------+ |value | +----------------------------------------+ |1 = LetterStateV2(1768196569401,List(a))| |2 = LetterStateV2(1768196569401,List(a))| +----------------------------------------+
The parameter accumulating all letters seen for each id now contains only a single letter. Moreover, the long value from the restored state doesn't look like a valid Unix timestamp. Why this inconsistency for a state value backed by a strongly typed case class? It's because the state mapping from your state store files to JVM objects with UnsafeRow is position-based.:
#StateSchemaCompatibilityChecker#validateAndMaybeEvolveStateSchema val newColFamilies = newStateSchema.map(_.colFamilyName).toSet val oldColFamilies = mostRecentColFamilies.toSet val colFamiliesAddedOrRemoved = newColFamilies != oldColFamilies val newSchemaFileWritten = hasEvolutions || colFamiliesAddedOrRemoved
And the whole validation flow looks like that:
Therefore, to avoid this issue you could add the new attribute at the end of the state class:
case class LetterStateV2(lowerLetters: Seq[String], processingTime: Long)
Now the code execution will return the valid state:
Restored state was Some(LetterStateV2(List(a),1)) Restored state was Some(LetterStateV2(List(a),1)) ------------------------------------------- Batch: 1 ------------------------------------------- +-------------------------------------------+ |value | +-------------------------------------------+ |1 = LetterStateV2(List(a, a),1768196838188)| |2 = LetterStateV2(List(a, a),1768196838188)| +-------------------------------------------+
Unfortunately, when you want to remove or change the field, you'll still be in trouble. Let's see another snippet where we need to remove the letters sequence:
case class LetterStateV3(processingTime: Long)
When you run this code you will see the state restoration doesn't work since it will resolve the sequence as the processing time:
Restored state was Some(LetterStateV3(103079215152)) Restored state was Some(LetterStateV3(103079215152))
Yet again, you can mitigate this issue:
- either by making the change by adding a new field first, and removing the previous one after some grace period (or max allowed state)
- or by performing state migration with the State Reader API
You agree with me, both are tedious tasks. To simplify this state evolution, Apache Spark has been supporting Apache Avro state since release 4.0.
State schema check
Before we delve into Apache Avro schema state resolution, I owe you some additional explanation. When you modify your UnsafeRow-backed schema and restart the job, Apache Spark will warn you about schema incompatibility with a StreamingQueryException like:
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: [STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE] Provided value schema does not match existing state value schema. Please check number and type of fields. Existing value_schema=StructType(StructField(lowerLetters,ArrayType(StringType,true),true)) and new value_schema=StructType(StructField(processingTime,LongType,false),StructField(lowerLetters,ArrayType(StringType,true),true)). If you want to force running the query without schema validation, please set spark.sql.streaming.stateStore.stateSchemaCheck to false. However, please note that running the query with incompatible schema could cause non-deterministic behavior. SQLSTATE: XXKST
As you can see in the error message, Apache Spark also gives you a hint on how to fix the issue. If despite the detected incompatibility you still want to proceed with the update, you have to disable the schema check flag:
val sparkSession = SparkSession.builder().master("local[*]")
.config("spark.sql.streaming.stateStore.stateSchemaCheck", false)
.getOrCreate()
Under-the-hood the error skipping happens in the highlighted code from the next snippet:
# org.apache.spark.sql.execution.streaming.state.StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema
var evolvedSchema = false
val result = Try(
checker.validateAndMaybeEvolveStateSchema(newStateSchema,
ignoreValueSchema = !storeConf.formatValidationCheckValue,
stateSchemaVersion = stateSchemaVersion, schemaEvolutionEnabled)
).toEither.fold(Some(_),
hasEvolvedSchema => {
evolvedSchema = hasEvolvedSchema
None
})
if (storeConf.stateSchemaCheckEnabled && result.isDefined) {
throw result.get
}
📌 State schema location
Besides pure metadata such as Apache Kafka offsets, Apache Spark Structured Streaming for stateful transformations also persists state schemas inside ${checkpoint directory}/${operator}/_stateSchema directory:
tree /tmp/wfc/state_evolution/checkpoint_apache_avro/state/0/_stateSchema/default -A /tmp/wfc/state_evolution/checkpoint_apache_avro/state/0/_stateSchema/default ??? 0_8ca87ff0-5e55-4179-91e0-c7315b4c9a8a ??? 1_ed8ec619-cd6d-4f3e-acc6-447a1da1e9b1 ??? 2_d72a1f15-aefb-49c4-88c3-dbb8567a2b98 ??? 3_cce38dcd-c62e-414f-8df5-b5583a7a6610 ??? 4_51a412ae-ca40-42fa-9c74-47a62d1a8594
Apache Avro state
Apache Avro-backed state is more flexible because it leverages Apache Avro state evolution semantics. Therefore, adding an attribute in an arbitrary position won't be an issue. Same comes for fields removal and even, under some conditions, for types changes. However, Apache Avro is not the default choice. You need to enable it first with this property:
val sparkSession = SparkSession.builder().master("local[*]")
.config("spark.sql.streaming.stateStore.encodingFormat", "avro")
.getOrCreate()
Under-the-hood Apache Spark uses a dedicated AvroStateEncoder that leverages org.apache.spark.sql.avro.AvroDeserializer. AvroDeserializer is the main Apache Spark actor responsible for converting rows from and into UnsafeRow format, including conversion in Apache Avro data source.
When it comes to type changes, Apache Avro supports type promotion from smaller type to a larger one without error, including strings. Among the currently supported configurations you'll find:
- int -> long, float, or double
- long -> float or double
- float -> double
- string -> bytes
- bytes -> string
⚠ Not only state schema
Changing your stateful topology can also break the job. When it happens, Apache Spark will throw this StreamingQueryException:
[STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA] Streaming stateful operator name does not match with the operator in state metadata. This likely to happen when user adds/removes/changes stateful operator of existing streaming query. Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: transformWithStateExec)]; Stateful operators in current batch: [(OperatorId: 0 -> OperatorName: transformWithStateExec), (OperatorId: 1 -> OperatorName: transformWithStateExec)]. SQLSTATE: 42K03
Ultimately, the Apache Avro schema offers significant flexibility. You can freely manipulate state fields and even promote certain types. For any unsupported scenarios, you can write a migration job on top of the State Reader API (which I'll detail in an upcoming post), or stick with the tried-and-true strategy of applying changes as additions to the existing state value.
Consulting
With nearly 17 years of experience, including 9 as data engineer, I offer expert consulting to design and optimize scalable data solutions.
As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and
drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!
👉 contact@waitingforcode.com
đź”— past projects

