Structured Streaming introduced a lot of new concepts regarding to the DStream-based streaming. One of them is the output mode.
Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in 👉
Early Release on the O'Reilly platform
I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩
This post presents the output modes introduced in Spark 2.0.0 to deal with streaming data output. The first part shows them through a short theoretical part. The second section presents their API. The last part shows how they work in some learning tests.
Output modes definition
The output mode specifies the way of writing the data to the result table. Among the available output modes we can distinguish:
- append - here only new rows are written to the output sink. This mode is reserved to the processing without any aggregation and is perfectly suited for the immutable results.
An important point to notice is the relationship with watermark. If the watermark is defined in the aggregation, it controls when the appended data is emitted to the result table. It happens only after the finalization of the "intermediate" state (= state changed during the processing). It can only occur if the new value of the watermark passes ahead of the most recent entry in given group. For instance let's suppose that the watermark is set to 19:00:00 and the most recent value for records of group "A" comes from 19:00:05. Now, when the watermark passes to 19:00:06, then the results of group "A" will be emitted to the result table. The engine considers that no newer events will be delivered and it's safe to send the result to the output table. - complete - in this mode all the rows are written to the output sink every time. The writing happens when the stream has some updates. This mode is exclusively reserved to the processing with aggregations.
- update - it's similar to the previous one except the fact that only updated rows are written to the output sink.
Output modes API
The output mode definition occurs in DataStreamWriter#outputMode(outputMode: String) method. So passed name is later translated to the corresponding case object from InternalOutputModes object.
The resolved instance is used mainly in the DataStreamWriter class. It's passed from there to the StreamingQueryManager#startQuery(userSpecifiedName: Option[String], userSpecifiedCheckpointLocation: Option[String], df: DataFrame, sink: Sink, outputMode: OutputMode, useTempCheckpointLocation: Boolean = false, recoverFromCheckpointLocation: Boolean = true, trigger: Trigger = ProcessingTime(0), triggerClock: Clock = new SystemClock()). As the name of this method indicates, it starts the streaming query execution.
In the physical execution side we can find the tracks of the output modes in StateStoreSaveExec. It's there where the intermediate stateful results are stored. By the way we can find there a lot of references to the watermarking that helps to remove too old results. If you want to learn more about it, please go to the post about StateStore in Apache Spark Structured Streaming.
Output modes examples
Below list summarizes which modes can be used for given types of processing. After each of them some tests are written in order to show the use and not-use cases:
- aggregation with watermark
- append mode: supported - but results emitted only after crossing the watermark (= output finalization). The query won't work if the aggregation is not applied on the watermark's column.
- complete mode: supported - unlike update, the watermark is not used
- update mode: supported - the watermark is used to drop too old aggregates
The tests:"the count on watermark column" should "be correctly computed in append mode" in { val testKey = "append-output-mode-with-watermark-aggregation" val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext) val now = 5000L val aggregatedStream = inputStream.toDS().toDF("created", "number") .withWatermark("created", "1 second") .groupBy("created") .count() val query = aggregatedStream.writeStream.outputMode("append") .foreach(new InMemoryStoreWriter[Row](testKey, (processedRow) => s"${processedRow.getAs[Long]("created")} -> ${processedRow.getAs[Long]("count")}")) .start() new Thread(new Runnable() { override def run(): Unit = { // send the first batch - max time = 10 seconds, so the watermark will be 9 seconds inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+4000), 3), (new Timestamp(now+5000), 4)) while (!query.isActive) {} Thread.sleep(10000) inputStream.addData((new Timestamp(4000L), 5), (new Timestamp(now+4000), 6)) inputStream.addData((new Timestamp(11000), 7)) Thread.sleep(10000) inputStream.addData((new Timestamp(11000), 8)) } }).start() query.awaitTermination(45000) val readValues = InMemoryKeyedStore.getValues(testKey) // As you can see, only the result for now+5000 was emitted. It's because the append output // mode emits the results once a new watermark is defined and the accumulated results are below // the new threshold readValues should have size 2 readValues should contain allOf("1970-01-01 01:00:09.0 -> 2", "1970-01-01 01:00:10.0 -> 3") } "the count on non-watermark column" should "fail in append mode" in { val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext) val aggregatedStream = inputStream.toDS().toDF("created", "number") .withWatermark("created", "1 second") .groupBy("number") .count() val exception = intercept[AnalysisException]{ aggregatedStream.writeStream.outputMode("append") .foreach(new NoopForeachWriter[Row]()).start() } exception.message should include("Append output mode not supported when there are streaming aggregations on " + "streaming DataFrames/DataSets without watermark") } "the count on watermark column" should "be correctly computed in update mode" in { val testKey = "update-output-mode-with-watermark-aggregation" val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext) val now = 5000L val aggregatedStream = inputStream.toDS().toDF("created", "number") .withWatermark("created", "1 second") .groupBy("created") .count() val query = aggregatedStream.writeStream.outputMode("update") .foreach(new InMemoryStoreWriter[Row](testKey, (processedRow) => s"${processedRow.getAs[Long]("created")} -> ${processedRow.getAs[Long]("count")}")).start() new Thread(new Runnable() { override def run(): Unit = { inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+5000), 3), (new Timestamp(now+5000), 4)) while (!query.isActive) {} Thread.sleep(10000) inputStream.addData((new Timestamp(4000L), 5)) inputStream.addData((new Timestamp(now), 6), (new Timestamp(11000), 7)) } }).start() query.awaitTermination(45000) val readValues = InMemoryKeyedStore.getValues(testKey) readValues should have size 2 readValues should contain allOf("1970-01-01 01:00:10.0 -> 4", "1970-01-01 01:00:11.0 -> 1") } "the count on non-watermark column" should "be correctly computed in update mode" in { val testKey = "update-output-mode-without-watermark-aggregation" val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext) val now = 5000L val aggregatedStream = inputStream.toDS().toDF("created", "number") .withWatermark("created", "1 second") .groupBy("number") .count() val query = aggregatedStream.writeStream.outputMode("update") .foreach(new InMemoryStoreWriter[Row](testKey, (processedRow) => s"${processedRow.getAs[Long]("number")} -> ${processedRow.getAs[Long]("count")}")).start() new Thread(new Runnable() { override def run(): Unit = { inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+5000), 3), (new Timestamp(now+5000), 3)) while (!query.isActive) {} Thread.sleep(10000) inputStream.addData((new Timestamp(4000L), 6)) inputStream.addData((new Timestamp(now), 6), (new Timestamp(11000), 7)) } }).start() query.awaitTermination(45000) val readValues = InMemoryKeyedStore.getValues(testKey) readValues should have size 5 readValues should contain allOf("1 -> 1", "3 -> 2", "2 -> 1", "6 -> 2", "7 -> 1") } "the count on watermark column" should "be correctly computed in complete mode" in { val testKey = "update-output-mode-with-watermark-aggregation" val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext) val now = 5000L val aggregatedStream = inputStream.toDS().toDF("created", "number") .withWatermark("created", "1 second") .groupBy("created") .count() val query = aggregatedStream.writeStream.outputMode("complete") .foreach(new InMemoryStoreWriter[Row](testKey, (processedRow) => s"${processedRow.getAs[Long]("created")} -> ${processedRow.getAs[Long]("count")}")).start() new Thread(new Runnable() { override def run(): Unit = { inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+5000), 3), (new Timestamp(now+5000), 4)) while (!query.isActive) {} Thread.sleep(10000) inputStream.addData((new Timestamp(4000L), 5)) inputStream.addData((new Timestamp(now), 6), (new Timestamp(11000), 7)) } }).start() query.awaitTermination(45000) val readValues = InMemoryKeyedStore.getValues(testKey) println(s"${readValues}") readValues should have size 5 readValues.sorted should equal (Seq("1970-01-01 01:00:04.0 -> 1", "1970-01-01 01:00:05.0 -> 1", "1970-01-01 01:00:10.0 -> 4", "1970-01-01 01:00:10.0 -> 4", "1970-01-01 01:00:11.0 -> 1")) } "the count on non-watermark column" should "be correctly computed in complete mode" in { val testKey = "update-output-mode-with-watermark-aggregation" val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext) val now = 5000L val aggregatedStream = inputStream.toDS().toDF("created", "number") .withWatermark("created", "1 second") .groupBy("number") .count() val query = aggregatedStream.writeStream.outputMode("complete") .foreach(new InMemoryStoreWriter[Row](testKey, (processedRow) => s"${processedRow.getAs[Long]("number")} -> ${processedRow.getAs[Long]("count")}")).start() new Thread(new Runnable() { override def run(): Unit = { inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+5000), 3), (new Timestamp(now+5000), 4)) while (!query.isActive) {} Thread.sleep(10000) inputStream.addData((new Timestamp(4000L), 5)) inputStream.addData((new Timestamp(now), 6), (new Timestamp(11000), 7)) } }).start() query.awaitTermination(45000) val readValues = InMemoryKeyedStore.getValues(testKey) println(s"${readValues}") readValues should have size 11 readValues.sorted should equal (Seq("1 -> 1", "1 -> 1", "2 -> 1", "2 -> 1", "3 -> 1", "3 -> 1", "4 -> 1", "4 -> 1", "5 -> 1", "6 -> 1", "7 -> 1")) }
- remaining aggregations (no watermark)
- append mode : not supported - the aggregations can be updated at any moment and it violates the principles of this mode
- complete mode: supported
- update mode : supported - but unlike told in the previous point, the old aggregates are not cleaned. The engine doesn't know when the aggregates are too old.
The tests:"append mode without aggregation" should "be disallow in structured streaming without watermark" in { val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext) val counter =inputStream.toDS().toDF("id", "name").agg(count("*")) inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3")) val exception = intercept[AnalysisException] { counter.writeStream.outputMode("append").foreach(new NoopForeachWriter()).start() } exception.message should include ("Append output mode not supported when there are streaming " + "aggregations on streaming DataFrames/DataSets without watermark") } "complete mode without aggregation" should "be allowed in structured streaming without watermark" in { val testKey = "no-aggregation-complete-output-mode" val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext) val counter =inputStream.toDS().toDF("id", "name").groupBy("name").agg(count("*")) inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3")) inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3")) inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3")) val query = counter.writeStream.outputMode("complete").foreach(new ForeachWriter[Row] { override def open(partitionId: Long, version: Long): Boolean = true override def process(row: Row): Unit = { println(s"processing ${row}") InMemoryKeyedStore.addValue(testKey, s"${row.getAs[String]("name")}=${row.getAs[Long]("count(1)")}") } override def close(errorOrNull: Throwable): Unit = {} }).start() new Thread(new Runnable() { override def run(): Unit = { Thread.sleep(10000) // Add only 1 row to show that all entries are sent to the result table every time inputStream.addData((1L, "test1")) } }).start() query.awaitTermination(30000) val processedRows = InMemoryKeyedStore.getValues(testKey) processedRows should have size 6 val rowsTest1 = processedRows.filter(_.startsWith("test1=")) rowsTest1 should have size 2 rowsTest1 should contain allOf("test1=3", "test1=4") val rowsTest2 = processedRows.filter(_.startsWith("test2=")) rowsTest2 should have size 2 rowsTest2 should contain only("test2=3") val rowsTest3 = processedRows.filter(_.startsWith("test3=")) rowsTest3 should have size 2 rowsTest3 should contain only("test3=3") } "update mode without aggregation" should "be allowed in structured streaming without watermark" in { val testKey = "no-aggregation-update-output-mode" val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext) val counter =inputStream.toDS().toDF("id", "name").groupBy("name").agg(count("*")) inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3")) inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3")) inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3")) val query = counter.writeStream.outputMode("update").foreach(new InMemoryStoreWriter[Row](testKey, (processedRow) => s"${processedRow.getAs[String]("name")} -> ${processedRow.getAs[Long]("count(1)")}")).start() new Thread(new Runnable() { override def run(): Unit = { Thread.sleep(10000) // Add only 1 row to show that only updated row is sent to the result table inputStream.addData((1L, "test1")) } }).start() query.awaitTermination(30000) val processedRows = InMemoryKeyedStore.getValues(testKey) processedRows should have size 4 processedRows should contain allOf("test1 -> 3", "test1 -> 4", "test2 -> 3", "test3 -> 3") }
- mapGroupsWithState processing
- append mode: not supported
- complete mode: not supported
- update mode: supported
The tests:private val MappingFunction: (Long, Iterator[Row], GroupState[Seq[String]]) => Seq[String] = (key, values, state) => { val stateNames = state.getOption.getOrElse(Seq.empty) val stateNewNames = stateNames ++ values.map(row => row.getAs[String]("name")) state.update(stateNewNames) stateNewNames } "append mode" should "not work for mapGroupWithState" in { val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext) val mappedValues =inputStream.toDS().toDF("id", "name") .groupByKey(row => row.getAs[Long]("id")) .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(MappingFunction) inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3") ) val exception = intercept[AnalysisException] { mappedValues.writeStream.outputMode("append").foreach(new NoopForeachWriter[Seq[String]]()).start() } exception.message should include("mapGroupsWithState is not supported with Append output mode on a " + "streaming DataFrame/Dataset") } "complete mode" should "not work for mapGroupWithState" in { val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext) val mappedValues =inputStream.toDS().toDF("id", "name") .groupByKey(row => row.getAs[Long]("id")) .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(MappingFunction) inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3") ) val exception = intercept[AnalysisException] { mappedValues.writeStream.outputMode("complete").foreach(new NoopForeachWriter[Seq[String]]()).start() } exception.message should include("Complete output mode not supported when there are no streaming " + "aggregations on streaming DataFrames/Datasets") } "update mode" should "work for mapGroupWithState" in { val testKey = "mapGroupWithState-update-output-mode" val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext) val mappedValues =inputStream.toDS().toDF("id", "name") .groupByKey(row => row.getAs[Long]("id")) .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(MappingFunction) inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20"), (3L, "test30")) val query = mappedValues.writeStream.outputMode("update") .foreach(new InMemoryStoreWriter[Seq[String]](testKey, (stateSeq) => stateSeq.mkString(","))).start() new Thread(new Runnable() { override def run(): Unit = { while (!query.isActive) {} Thread.sleep(5000) inputStream.addData((1L, "test12"), (1L, "test13"), (2L, "test21")) } }).start() query.awaitTermination(30000) val savedValues = InMemoryKeyedStore.getValues(testKey) savedValues should have size 5 savedValues should contain allOf("test30", "test10,test11", "test20", "test10,test11,test12,test13", "test20,test21") }
- flatMapGroupsWithState
- append mode: supported. Moreover, the aggregations are allowed after flatMapGroupsWithState. In order to work, one condition must be met. The watermark must be defined before the aggregation declared after flatMapGroupsWithState transformation.
- complete mode: not supported
- update mode: supported but the aggregations are not allowed after flatMapGroupsWithState
The tests:private val MappingFunction: (Long, Iterator[Row], GroupState[Seq[String]]) => Iterator[String] = (key, values, state) => { val stateNames = state.getOption.getOrElse(Seq.empty) //state.setTimeoutDuration(1000L) val stateNewNames = stateNames ++ values.map(row => row.getAs[String]("name")) state.update(stateNewNames) Iterator(stateNewNames.mkString(",")) } "multiple flatMapGroupsWithState" should "fail because they're not all in append mode" in { val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext) val flattenResults = inputStream.toDS().toDF("id", "name") .groupByKey(row => row.getAs[Long]("id")) .flatMapGroupsWithState(outputMode = OutputMode.Append(), timeoutConf = NoTimeout())(MappingFunction) .groupByKey(key => key) .flatMapGroupsWithState(outputMode = OutputMode.Update(), timeoutConf = NoTimeout())((key, values, state: GroupState[String]) => Iterator("")) inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3")) val exception = intercept[AnalysisException] { flattenResults.writeStream.outputMode("complete").foreach(new NoopForeachWriter[String]()).start() } exception.message should include("Multiple flatMapGroupsWithStates are not supported when they are not " + "all in append mode or the output mode is not append on a streaming DataFrames/Datasets") } "flatMapGroupsWithState" should "fail for complete mode" in { val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext) val exception = intercept[IllegalArgumentException] { inputStream.toDS().toDF("id", "name") .groupByKey(row => row.getAs[Long]("id")) .flatMapGroupsWithState(outputMode = OutputMode.Complete(), timeoutConf = NoTimeout())(MappingFunction) } exception.getMessage should include("The output mode of function should be append or update") } "flatMapGroupsWithState" should "fail for append mode in flatMap and update mode in sink" in { val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext) val flattenResults = inputStream.toDS().toDF("id", "name") .groupByKey(row => row.getAs[Long]("id")) .flatMapGroupsWithState(outputMode = OutputMode.Append(), timeoutConf = NoTimeout())(MappingFunction) inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3")) val exception = intercept[AnalysisException] { flattenResults.writeStream.outputMode("update").foreach(new NoopForeachWriter[String]()).start() } exception.message should include("flatMapGroupsWithState in append mode is not supported with Update output " + "mode on a streaming DataFrame/Dataset") } "flatMapGroupsWithState" should "work for append mode in flatMap and sink" in { val testKey = "flatMapGroupsWithState-append-mode" val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext) val flattenResults = inputStream.toDS().toDF("id", "name") .groupByKey(row => row.getAs[Long]("id")) .flatMapGroupsWithState(outputMode = OutputMode.Append(), timeoutConf = NoTimeout())(MappingFunction) inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20")) val query = flattenResults.writeStream.outputMode("append").foreach( new InMemoryStoreWriter[String](testKey, (state) => state)).start() new Thread(new Runnable() { override def run(): Unit = { while (!query.isActive) {} Thread.sleep(5000) inputStream.addData((1L, "test12")) inputStream.addData((1L, "test13")) } }).start() query.awaitTermination(45000) val savedValues = InMemoryKeyedStore.getValues(testKey) savedValues should have size 3 savedValues should contain allOf("test20", "test10,test11", "test10,test11,test12,test13") } "flatMapGroupsWithState" should "fail for append mode in flatMap and complete mode in sink" in { val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext) val flattenResults = inputStream.toDS().toDF("id", "name") .groupByKey(row => row.getAs[Long]("id")) .flatMapGroupsWithState(outputMode = OutputMode.Append(), timeoutConf = NoTimeout())(MappingFunction) inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3")) val exception = intercept[AnalysisException] { flattenResults.writeStream.outputMode("complete").foreach(new NoopForeachWriter[String]()).start() } exception.message should include("Complete output mode not supported when there are no streaming aggregations " + "on streaming DataFrames/Datasets") } "flatMapGroupsWithState with aggregation after mapping" should "fail for append because of missing watermark" in { val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext) val flattenResults = inputStream.toDS().toDF("id", "name") .groupByKey(row => row.getAs[Long]("id")) .flatMapGroupsWithState(outputMode = OutputMode.Append(), timeoutConf = NoTimeout())(MappingFunction) .agg(count("*").as("count")) inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20")) val exception = intercept[AnalysisException] { flattenResults.writeStream.outputMode("append").foreach(new NoopForeachWriter[Row]).start() } exception.getMessage() should include("Append output mode not supported when there are streaming " + "aggregations on streaming DataFrames/DataSets without watermark") } "flatMapGroupsWithState with aggregation after mapping" should "succeed for append when watermark is defined" in { val inputStream = new MemoryStream[(Timestamp, Long, String)](1, sparkSession.sqlContext) val now = 5000 val flattenResults = inputStream.toDS().toDF("created", "id", "name") .groupByKey(row => row.getAs[Timestamp]("id")) .flatMapGroupsWithState(outputMode = OutputMode.Append(), timeoutConf = NoTimeout())((key, values, state: GroupState[(Timestamp, String)]) => Iterator((new Timestamp(1000), ""))) .toDF("created", "name") .withWatermark("created", "1 second") .groupBy("created") .agg(count("*")) inputStream.addData((new Timestamp(now), 1L, "test10"), (new Timestamp(now), 1L, "test11"), (new Timestamp(now), 2L, "test20")) flattenResults.writeStream.outputMode("append").foreach(new NoopForeachWriter[Row]).start() } "flatMapGroupsWithState with aggregation after mapping" should "fail for append when watermark is not defined" in { val inputStream = new MemoryStream[(Timestamp, Long, String)](1, sparkSession.sqlContext) val now = 5000 val flattenResults = inputStream.toDS().toDF("created", "id", "name") .groupByKey(row => row.getAs[Timestamp]("id")) .flatMapGroupsWithState(outputMode = OutputMode.Append(), timeoutConf = NoTimeout())((key, values, state: GroupState[(Timestamp, String)]) => Iterator((new Timestamp(1000), ""))) .toDF("created", "name") .groupBy("created") .agg(count("*")) inputStream.addData((new Timestamp(now), 1L, "test10"), (new Timestamp(now), 1L, "test11"), (new Timestamp(now), 2L, "test20")) val exception = intercept[AnalysisException] { flattenResults.writeStream.outputMode("append").foreach(new NoopForeachWriter[Row]).start() } exception.getMessage() should include("Append output mode not supported when there are streaming aggregations on " + "streaming DataFrames/DataSets without watermark") } "flatMapGroupsWithState with aggregation after" should "fail for update mode" in { val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext) val flattenResults = inputStream.toDS().toDF("id", "name") .groupByKey(row => row.getAs[Long]("id")) .flatMapGroupsWithState(outputMode = OutputMode.Update(), timeoutConf = NoTimeout())(MappingFunction) .agg(count("*")) inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3")) val exception = intercept[AnalysisException] { flattenResults.writeStream.outputMode("update").foreach(new NoopForeachWriter[Row]()).start() } exception.message should include("flatMapGroupsWithState in update mode is not supported with " + "aggregation on a streaming DataFrame/Dataset") } "flatMapGroupsWithState without aggregation" should "be correctly executed in update mode" in { val testKey = "flatMapGroupsWithState-no-aggregation" val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext) val flattenResults = inputStream.toDS().toDF("id", "name") .groupByKey(row => row.getAs[Long]("id")) .flatMapGroupsWithState(outputMode = OutputMode.Update(), timeoutConf = NoTimeout())(MappingFunction) inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20")) val query = flattenResults.writeStream.outputMode("update") .foreach(new InMemoryStoreWriter[String](testKey, (state) => state)).start() new Thread(new Runnable() { override def run(): Unit = { while (!query.isActive) {} Thread.sleep(5000) inputStream.addData((1L, "test12")) inputStream.addData((1L, "test13")) } }).start() query.awaitTermination(40000) val savedValues = InMemoryKeyedStore.getValues(testKey) savedValues should have size 3 savedValues should contain allOf("test20", "test10,test11", "test10,test11,test12,test13") }
- remaining processing types
- append mode: supported
- complete mode: not supported - it's very hard to keep the whole data in the result table
- update mode: supported
The tests:"append" should "work for map transform" in { val testKey = "other-processing-append-output-mode" val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext) val mappedResult = inputStream.toDS().toDF("id", "name") .map(row => row.getAs[String]("name")) inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20")) val query = mappedResult.writeStream.outputMode("append") .foreach(new InMemoryStoreWriter[String](testKey, (mappedValue) => mappedValue)).start() new Thread(new Runnable() { override def run(): Unit = { while (!query.isActive) {} Thread.sleep(5000) inputStream.addData((1L, "test12")) inputStream.addData((1L, "test13")) } }).start() query.awaitTermination(30000) val processedValues = InMemoryKeyedStore.getValues(testKey) processedValues should have size 5 processedValues should contain allOf("test10", "test11", "test12", "test13", "test20") } "update" should "work for map transform" in { val testKey = "other-processing-update-output-mode" val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext) val mappedResult = inputStream.toDS().toDF("id", "name") .map(row => row.getAs[String]("name")) inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20")) val query = mappedResult.writeStream.outputMode("update") .foreach(new InMemoryStoreWriter[String](testKey, (mappedValue) => mappedValue)).start() new Thread(new Runnable() { override def run(): Unit = { while (!query.isActive) {} Thread.sleep(5000) inputStream.addData((1L, "test12")) inputStream.addData((1L, "test13")) } }).start() query.awaitTermination(30000) val processedValues = InMemoryKeyedStore.getValues(testKey) processedValues should have size 5 processedValues should contain allOf("test10", "test11", "test12", "test13", "test20") } "complete mode" should "not be accepted in mapping transform" in { val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext) val mappedResults = inputStream.toDS().toDF("id", "name") .map(row => row.getAs[String]("name")) inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3")) val exception = intercept[AnalysisException] { mappedResults.writeStream.outputMode("complete").foreach(new NoopForeachWriter[String]()).start() } exception.message should include("Complete output mode not supported when there are no streaming aggregations " + "on streaming DataFrames/Datasets") }
The output modes in Apache Spark determines how the output is generated. Among 3 different strategies, one of them returns always the complete result while 2 others either appends the results that are not supposed to receive the data anymore or update already computed results. All of these main behaviors were shown in the tests defined in the 3rd section.
Consulting

With nearly 16 years of experience, including 8 as data engineer, I offer expert consulting to design and optimize scalable data solutions.
As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and
drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!
👉 contact@waitingforcode.com
đź”— past projects