OutputModes in Apache Spark Structured Streaming - complementary notes

Versions: Apache Spark 3.5.0

I wrote a blog post about OutputModes 6 (yes!) years ago and after reading it a few times, I realized it was not good enough to be a quick refresher. For that reason you can read about OutputModes for the second time here. Hopefully, this one will be a good try!

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in πŸ‘‰ Early Release on the O'Reilly platform

I also help solve your data engineering problems πŸ‘‰ contact@waitingforcode.com πŸ“©

OutputModes 101

According to the Scaladoc, the OutputMode:

 * OutputMode describes what data will be written to a streaming sink when there is
 * new data available in a streaming DataFrame/Dataset.

As of it today (Apache Spark 3.5.0), there are three output modes available:

Although it looks simple, their semantics adapt to the query. Let's see how by analyzing the errors from the UnsupportedOperationChecker:

Why these constraints? First, let's take the Complete mode. It doesn't work for non-aggregated jobs. The reason for that is simple, an aggregated result is by definition smaller than the raw data composing it. Therefore, there are more chances that this reduced version can be stored in the job before emitting it.

When it comes to the Append mode now, it implies append-only data, therefore a kind of immutable data that once written to the sink will never change. For that reason you can't use it for aggregations without watermarks - that kind of stateful aggregate will never expire, therefore, it may change any time. It's worth noting that the Append mode for the queries with aggregates emits the row only when the aggregate expires, i.e. when it's older than the watermark.

Finally, the Update mode allows mutability but within the same execution unit.

Update mode and MERGE

Although the Update mode is the easiest to understand, it may hide a trap in the context of a session window and the way how the Update is perceived. Let's take an example of a session window like this:

session_window: DataFrame = (visits_events
  .withWatermark("event_time", "12 seconds")
  .groupBy("visit_id", F.session_window(F.col("event_time"), "10 seconds")).count())

As Jungtaek Lim greatly points this out in SPARK-36463 Pull Request, the aggregation entries provided as part of an Update mode can be considered in terms of UPSERT/MERGE operation, i.e. each update can refresh the already present value as the aggregation key doesn't change, even for the next created aggregation states. It doesn't hold for the session window which has a kind of composite key (visit_id and event_time range from the snippet). It can lead to two situations:

  • Either the session is not updated, i.e. multiple rows for the same entity are created
  • Or the session is updated but the final result will always correspond to the last session window

For that reason, using the Update mode with session window is prohibited.

OutputModes and logical planning

Therefore, the first place in the code when you'll see output modes enforcement is logical planning. Let's see now the conditions for the behavior explained before, plus some extra constraints, by analyzing the UnsupportedOperationChecker better. You'll learn that:

OutputModes and stateful operators

I bet you noticed the *MapGroupsWithState. They're the methods you call while implementing the arbitrary stateful processing code. Surprisingly or not, they also accept the OutputMode in their signatures! For example here:

def flatMapGroupsWithState[S: Encoder, U: Encoder](
  outputMode: OutputMode,
  timeoutConf: GroupStateTimeout)(
  func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U] = {

Append

StateStoreSaveExec, aggregations, and complete mode

Why does the StateStoreSaveExec return rows? In fact, it's a regular node in Apache Spark execution plan and consequently, it can have some downstream operators. Below is an execution plan for inputStream.toDS().toDF("id", "name").groupBy("name").agg(count("*")).writeStream.outputMode("complete").format("console").start():

== Physical Plan ==
WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: ConsoleWriter[numRows=20, truncate=true]], 
+- *(4) HashAggregate(keys=[name#8], functions=[count(1)])
   +- StateStoreSave [name#8], state info [ checkpoint = ... ], Complete, 0, 0, 2
  	+- *(3) HashAggregate(keys=[name#8], functions=[merge_count(1)])
     	+- StateStoreRestore [name#8], state info [ checkpoint = ...], 2
        	+- *(2) HashAggregate(keys=[name#8], functions=[merge_count(1)])
           	+- Exchange hashpartitioning(name#8, 200), ENSURE_REQUIREMENTS, [plan_id=55]
              	+- *(1) HashAggregate(keys=[name#8], functions=[partial_count(1)])
                 	+- *(1) Project [_2#3 AS name#8]
                    	+- MicroBatchScan[_1#2L, _2#3] MemoryStreamDataSource

Complete

Update

FlatMapGroupsWithState and output mode

Five years ago I answered a StackOverflow question What's the purpose of OutputMode in flatMapGroupsWithState? How/where is it used? with a guess. Indeed, the output mode is still not used in the FlatMapGroupsWithStateExec but you saw in this section that it's referenced in all the validation operations to provide a correctness semantics to the streaming job.

How? First, the custom mapping function provided as a part of the FlagMapGroupsWithState action simply doesn't work for the Complete mode. Apache Spark triggers the function only for the updated (Update mode) or expired (Append mode) rows in the given micro-batch. It'll never trigger it for each state present in the state store.

Second, there is an important semantic difference for downstream operators. Thank you, Gerard Maas and François Garillot, for explaining this in you "Stream Processing with Apache Spark" book. In a nutshell:

  • mapGroupsWithState vs. flatMapGroupsWithState - the difference is the output. The former returns a single state object while the latter an iterator of them. Semantically then, you'll use the mapGroupsWithState to emit the final state while with the flatMapGroupsWithState you can include all intermediary states created in the given micro-batch.
  • Append vs. Update - knowing the semantic, the Append in the flatMapGroupsWithState stands for the final results, i.e. the stateful function processed all records for the given group and can be safely evaluated later as a final outcome. For the Update mode, the results aren't final and all downstream operators must update their results consequently, i.e. they cannot perform any final action like generating aggregates.

OutputModes and sinks V2

But the output modes don't only impact the stateful operations! They also impact data sinks. Let's take a look at the V2Writes logical rule applying to the V2 data sinks:

private def buildWriteForMicroBatch(table: SupportsWrite, writeBuilder: WriteBuilder, outputMode: OutputMode): Write = {
  outputMode match {
    case Append =>
       writeBuilder.build()
    case Complete =>
       require(writeBuilder.isInstanceOf[SupportsTruncate],
          table.name + " does not support Complete mode.")
       writeBuilder.asInstanceOf[SupportsTruncate].truncate().build()
     case Update =>
       require(writeBuilder.isInstanceOf[SupportsStreamingUpdateAsAppend],
         table.name + " does not support Update mode.")
       writeBuilder.asInstanceOf[SupportsStreamingUpdateAsAppend].build()
  }
}

As you can see, append mode is the easiest one as it only adds new rows. The Complete one provides all rows, and thus requires the output truncate capability. Finally, the Update one, as it can refresh already emitted rows, therefore perform an UPSERT (MERGE) operation.

Besides this pure API-based constraint, the output modes are also sometimes used in individual sinks. For example, Delta Lake creates the sink only if the output mode is Append or Complete:

class DeltaDataSource...

  override def createSink(...)
    if (outputMode != OutputMode.Append && outputMode != OutputMode.Complete) {
      throw DeltaErrors.outputModeNotSupportedException(getClass.getName, outputMode.toString)
    }

Besides, it's also a condition to overwrite the schema:

case class DeltaSink(
  // ...
  override protected val canOverwriteSchema: Boolean =
    outputMode == OutputMode.Complete() && options.canOverwriteSchema

After writing this article I now have a feeling of closing the loop for the OutputModes. Although there is not a lot of code snippets here (there are part of Output modes in Apache Spark Structured Streaming), I added some extra explanation for myself from the future, and hopefully you, if you need to understand the output modes better.