I wrote a blog post about OutputModes 6 (yes!) years ago and after reading it a few times, I realized it was not good enough to be a quick refresher. For that reason you can read about OutputModes for the second time here. Hopefully, this one will be a good try!
A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free
👉 Register here
OutputModes 101
According to the Scaladoc, the OutputMode:
* OutputMode describes what data will be written to a streaming sink when there is * new data available in a streaming DataFrame/Dataset.
As of it today (Apache Spark 3.5.0), there are three output modes available:
- Append. It passes only new rows to the sink.
- Complete. It passes all rows to the sink.
- Update. It passes only updated rows to the sink.
Although it looks simple, their semantics adapt to the query. Let's see how by analyzing the errors from the UnsupportedOperationChecker:
- Multiple mapGroupsWith state are allowed in Append mode only.
- Append doesn't work for a job with aggregations without the watermark.
- Update doesn't work for a job with a session window.
- Complete doesn't work for a job without aggregations.
Why these constraints? First, let's take the Complete mode. It doesn't work for non-aggregated jobs. The reason for that is simple, an aggregated result is by definition smaller than the raw data composing it. Therefore, there are more chances that this reduced version can be stored in the job before emitting it.
When it comes to the Append mode now, it implies append-only data, therefore a kind of immutable data that once written to the sink will never change. For that reason you can't use it for aggregations without watermarks - that kind of stateful aggregate will never expire, therefore, it may change any time. It's worth noting that the Append mode for the queries with aggregates emits the row only when the aggregate expires, i.e. when it's older than the watermark.
Finally, the Update mode allows mutability but within the same execution unit.
Update mode and MERGE
Although the Update mode is the easiest to understand, it may hide a trap in the context of a session window and the way how the Update is perceived. Let's take an example of a session window like this:
session_window: DataFrame = (visits_events .withWatermark("event_time", "12 seconds") .groupBy("visit_id", F.session_window(F.col("event_time"), "10 seconds")).count())
As Jungtaek Lim greatly points this out in SPARK-36463 Pull Request, the aggregation entries provided as part of an Update mode can be considered in terms of UPSERT/MERGE operation, i.e. each update can refresh the already present value as the aggregation key doesn't change, even for the next created aggregation states. It doesn't hold for the session window which has a kind of composite key (visit_id and event_time range from the snippet). It can lead to two situations:
- Either the session is not updated, i.e. multiple rows for the same entity are created
- Or the session is updated but the final result will always correspond to the last session window
For that reason, using the Update mode with session window is prohibited.
OutputModes and logical planning
Therefore, the first place in the code when you'll see output modes enforcement is logical planning. Let's see now the conditions for the behavior explained before, plus some extra constraints, by analyzing the UnsupportedOperationChecker better. You'll learn that:
- Having multiple flatMapGroupsWithState with a different mode than the Append one is not allowed:
if (flatMapGroupsWithStates.size >= 2 && ( outputMode != InternalOutputModes.Append || flatMapGroupsWithStates.exists(_.outputMode != InternalOutputModes.Append) )) { throwError( "Multiple flatMapGroupsWithStates are not supported when they are not all in append mode" + " or the output mode is not append on a streaming DataFrames/Datasets")(plan) }
- Append mode for jobs with aggregations and no watermark is not supported:
outputMode match { case InternalOutputModes.Append if aggregates.nonEmpty => // ... if (watermarkAttributes.isEmpty) { throwError( s"$outputMode output mode not supported when there are streaming aggregations on " + s"streaming DataFrames/DataSets without watermark")(plan) }
- Session window doesn't work with the Update mode:
case InternalOutputModes.Update if aggregates.nonEmpty => // ... if (existingSessionWindow) { throwError(s"$outputMode output mode not supported for session window on " + "streaming DataFrames/DataSets")(plan) }
- Complete mode doesn't work for a job without aggregates:
case InternalOutputModes.Complete if aggregates.isEmpty => throwError( s"$outputMode output mode not supported when there are no streaming aggregations on " + s"streaming DataFrames/Datasets")(plan)
- Sorting is only supported if you use aggregates and Complete mode:
case Sort(_, _, _) if !containsCompleteData(subPlan) => throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on " + "aggregated DataFrame/Dataset in Complete output mode")
- The mapGroupsWith state only works for the Update mode:
if (outputMode != InternalOutputModes.Update) { throwError( "mapGroupsWithState is not supported with " + s"$outputMode output mode on a streaming DataFrame/Dataset") }
- The flatMapGroupsWithState with aggregation doesn't work for the Update mode:
case m: FlatMapGroupsWithState if m.isStreaming => if (m.outputMode == InternalOutputModes.Update) { throwError("flatMapGroupsWithState in update mode is not supported with " + "aggregation on a streaming DataFrame/Dataset") }
Same rule applies to the applyInPandasWithState. - The flatMapGroupsWithState without aggregation must use the same mode as the job:
case m: FlatMapGroupsWithState if m.isStreaming => m.outputMode match { case InternalOutputModes.Update if outputMode != InternalOutputModes.Update => throwError( "flatMapGroupsWithState in update mode is not supported with " + s"$outputMode output mode on a streaming DataFrame/Dataset") case InternalOutputModes.Append if outputMode != InternalOutputModes.Append => throwError( "flatMapGroupsWithState in append mode is not supported with " + s"$outputMode output mode on a streaming DataFrame/Dataset")
Same rule applies to the applyInPandasWithState. - Only the Append mode is supported for the stream-to-stream joins:
case j @ Join(left, right, joinType, condition, _) => if (left.isStreaming && right.isStreaming && outputMode != InternalOutputModes.Append) { throwError("Join between two streaming DataFrames/Datasets is not supported" + s" in ${outputMode} output mode, only in Append output mode") }
- Correctness issue detection where subsequent stateful operations are accepted only in the Append mode:
private def ifCannotBeFollowedByStatefulOperation(p: LogicalPlan, outputMode: OutputMode): Boolean = p match { case a: Aggregate if a.isStreaming && outputMode != InternalOutputModes.Append => true case d @ Distinct(_: LogicalPlan) if d.isStreaming && outputMode != InternalOutputModes.Append => true
OutputModes and stateful operators
I bet you noticed the *MapGroupsWithState. They're the methods you call while implementing the arbitrary stateful processing code. Surprisingly or not, they also accept the OutputMode in their signatures! For example here:
def flatMapGroupsWithState[S: Encoder, U: Encoder]( outputMode: OutputMode, timeoutConf: GroupStateTimeout)( func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U] = {
Append
- StateStoreSaveExec - the operator is responsible for saving state to the state store and returning it. As the Append mode emits expired rows, in the context of persisting the state, it keeps only the records no older than the watermark. When it comes to emitting the rows downstream, it only returns the expired ones.
- SessionWindowStateStoreSaveExec - emits the expired rows only.
- FlatMapGroupsWithState - if constructed from mapGroupsWithState, it fails since for this method, the Update mode is the single allowed one.
- flatMapGroupsWithState(...) - the function also creates FlatMapGroupsWithState, but the isMapGroupsWithState flag set to false. Append is one of two allowed modes here.
- applyInPandasWithState(...) - the function creates FlatMapGroupsInPandasWithState. Append is one of two allowed modes here.
StateStoreSaveExec, aggregations, and complete mode
Why does the StateStoreSaveExec return rows? In fact, it's a regular node in Apache Spark execution plan and consequently, it can have some downstream operators. Below is an execution plan for inputStream.toDS().toDF("id", "name").groupBy("name").agg(count("*")).writeStream.outputMode("complete").format("console").start():
== Physical Plan == WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: ConsoleWriter[numRows=20, truncate=true]], +- *(4) HashAggregate(keys=[name#8], functions=[count(1)]) +- StateStoreSave [name#8], state info [ checkpoint = ... ], Complete, 0, 0, 2 +- *(3) HashAggregate(keys=[name#8], functions=[merge_count(1)]) +- StateStoreRestore [name#8], state info [ checkpoint = ...], 2 +- *(2) HashAggregate(keys=[name#8], functions=[merge_count(1)]) +- Exchange hashpartitioning(name#8, 200), ENSURE_REQUIREMENTS, [plan_id=55] +- *(1) HashAggregate(keys=[name#8], functions=[partial_count(1)]) +- *(1) Project [_2#3 AS name#8] +- MicroBatchScan[_1#2L, _2#3] MemoryStreamDataSource
Complete
- StateStoreSaveExec - unsurprisingly, all rows will be returned from the state store.
- SessionWindowStateStoreSaveExec - same as StateStoreSaveExec.
- FlatMapGroupsWithState - if constructed from mapGroupsWithState, it fails since for this method, the Update mode is the single allowed one.
- flatMapGroupsWithState(...) - the function also creates FlatMapGroupsWithState, but the Complete mode is not allowed.
- applyInPandasWithState(...) - the function creates FlatMapGroupsInPandasWithState. Complete mode is not allowed here.
Update
- StateStoreSaveExec - the operator saves updated and not expired rows to the state store.
- mapGroupsWithState(...) - as the function creates FlatMapGroupsWithState with the isMapGroupsWithState flag set to true, Update is the single allowed mode here.
- flatMapGroupsWithState(...) - the function also creates FlatMapGroupsWithState, but the isMapGroupsWithState flag set to false. Update is one of two allowed modes here.
- applyInPandasWithState(...) - the function creates FlatMapGroupsInPandasWithState. Update is one of two allowed modes here.
FlatMapGroupsWithState and output mode
Five years ago I answered a StackOverflow question What's the purpose of OutputMode in flatMapGroupsWithState? How/where is it used? with a guess. Indeed, the output mode is still not used in the FlatMapGroupsWithStateExec but you saw in this section that it's referenced in all the validation operations to provide a correctness semantics to the streaming job.
How? First, the custom mapping function provided as a part of the FlagMapGroupsWithState action simply doesn't work for the Complete mode. Apache Spark triggers the function only for the updated (Update mode) or expired (Append mode) rows in the given micro-batch. It'll never trigger it for each state present in the state store.
Second, there is an important semantic difference for downstream operators. Thank you, Gerard Maas and François Garillot, for explaining this in you "Stream Processing with Apache Spark" book. In a nutshell:
- mapGroupsWithState vs. flatMapGroupsWithState - the difference is the output. The former returns a single state object while the latter an iterator of them. Semantically then, you'll use the mapGroupsWithState to emit the final state while with the flatMapGroupsWithState you can include all intermediary states created in the given micro-batch.
- Append vs. Update - knowing the semantic, the Append in the flatMapGroupsWithState stands for the final results, i.e. the stateful function processed all records for the given group and can be safely evaluated later as a final outcome. For the Update mode, the results aren't final and all downstream operators must update their results consequently, i.e. they cannot perform any final action like generating aggregates.
OutputModes and sinks V2
But the output modes don't only impact the stateful operations! They also impact data sinks. Let's take a look at the V2Writes logical rule applying to the V2 data sinks:
private def buildWriteForMicroBatch(table: SupportsWrite, writeBuilder: WriteBuilder, outputMode: OutputMode): Write = { outputMode match { case Append => writeBuilder.build() case Complete => require(writeBuilder.isInstanceOf[SupportsTruncate], table.name + " does not support Complete mode.") writeBuilder.asInstanceOf[SupportsTruncate].truncate().build() case Update => require(writeBuilder.isInstanceOf[SupportsStreamingUpdateAsAppend], table.name + " does not support Update mode.") writeBuilder.asInstanceOf[SupportsStreamingUpdateAsAppend].build() } }
As you can see, append mode is the easiest one as it only adds new rows. The Complete one provides all rows, and thus requires the output truncate capability. Finally, the Update one, as it can refresh already emitted rows, therefore perform an UPSERT (MERGE) operation.
Besides this pure API-based constraint, the output modes are also sometimes used in individual sinks. For example, Delta Lake creates the sink only if the output mode is Append or Complete:
class DeltaDataSource... override def createSink(...) if (outputMode != OutputMode.Append && outputMode != OutputMode.Complete) { throw DeltaErrors.outputModeNotSupportedException(getClass.getName, outputMode.toString) }
Besides, it's also a condition to overwrite the schema:
case class DeltaSink( // ... override protected val canOverwriteSchema: Boolean = outputMode == OutputMode.Complete() && options.canOverwriteSchema
After writing this article I now have a feeling of closing the loop for the OutputModes. Although there is not a lot of code snippets here (there are part of Output modes in Apache Spark Structured Streaming), I added some extra explanation for myself from the future, and hopefully you, if you need to understand the output modes better.