Output modes in Apache Spark Structured Streaming

on waitingforcode.com

Output modes in Apache Spark Structured Streaming

Structured Streaming introduced a lot of new concepts regarding to the DStream-based streaming. One of them is the output mode.

This post presents the output modes introduced in Spark 2.0.0 to deal with streaming data output. The first part shows them through a short theoretical part. The second section presents their API. The last part shows how they work in some learning tests.

Output modes definition

The output mode specifies the way of writing the data to the result table. Among the available output modes we can distinguish:

  • append - here only new rows are written to the output sink. This mode is reserved to the processing without any aggregation and is perfectly suited for the immutable results.
    An important point to notice is the relationship with watermark. If the watermark is defined in the aggregation, it controls when the appended data is emitted to the result table. It happens only after the finalization of the "intermediate" state (= state changed during the processing). It can only occur if the new value of the watermark passes ahead of the most recent entry in given group. For instance let's suppose that the watermark is set to 19:00:00 and the most recent value for records of group "A" comes from 19:00:05. Now, when the watermark passes to 19:00:06, then the results of group "A" will be emitted to the result table. The engine considers that no newer events will be delivered and it's safe to send the result to the output table.
  • complete - in this mode all the rows are written to the output sink every time. The writing happens when the stream has some updates. This mode is exclusively reserved to the processing with aggregations.
  • update - it's similar to the previous one except the fact that only updated rows are written to the output sink.

Output modes API

The output mode definition occurs in DataStreamWriter#outputMode(outputMode: String) method. So passed name is later translated to the corresponding case object from InternalOutputModes object.

The resolved instance is used mainly in the DataStreamWriter class. It's passed from there to the StreamingQueryManager#startQuery(userSpecifiedName: Option[String], userSpecifiedCheckpointLocation: Option[String], df: DataFrame, sink: Sink, outputMode: OutputMode, useTempCheckpointLocation: Boolean = false, recoverFromCheckpointLocation: Boolean = true, trigger: Trigger = ProcessingTime(0), triggerClock: Clock = new SystemClock()). As the name of this method indicates, it starts the streaming query execution.

In the physical execution side we can find the tracks of the output modes in StateStoreSaveExec. It's there where the intermediate stateful results are stored. By the way we can find there a lot of references to the watermarking that helps to remove too old results. If you want to learn more about it, please go to the post about StateStore in Apache Spark Structured Streaming.

Output modes examples

Below list summarizes which modes can be used for given types of processing. After each of them some tests are written in order to show the use and not-use cases:

  • aggregation with watermark
    • append mode: supported - but results emitted only after crossing the watermark (= output finalization). The query won't work if the aggregation is not applied on the watermark's column.
    • complete mode: supported - unlike update, the watermark is not used
    • update mode: supported - the watermark is used to drop too old aggregates

    The tests:
          "the count on watermark column" should "be correctly computed in append mode" in {
            val testKey = "append-output-mode-with-watermark-aggregation"
            val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext)
            val now = 5000L
            val aggregatedStream = inputStream.toDS().toDF("created", "number")
              .withWatermark("created", "1 second")
              .groupBy("created")
              .count()
    
            val query = aggregatedStream.writeStream.outputMode("append")
              .foreach(new InMemoryStoreWriter[Row](testKey,
                (processedRow) => s"${processedRow.getAs[Long]("created")} -> ${processedRow.getAs[Long]("count")}"))
              .start()
    
            new Thread(new Runnable() {
              override def run(): Unit = {
                // send the first batch - max time = 10 seconds, so the watermark will be 9 seconds
                inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+4000), 3),
                  (new Timestamp(now+5000), 4))
                while (!query.isActive) {}
                Thread.sleep(10000)
                inputStream.addData((new Timestamp(4000L), 5), (new Timestamp(now+4000), 6))
                inputStream.addData((new Timestamp(11000), 7))
                Thread.sleep(10000)
                inputStream.addData((new Timestamp(11000), 8))
              }
            }).start()
    
            query.awaitTermination(45000)
    
            val readValues = InMemoryKeyedStore.getValues(testKey)
            // As you can see, only the result for now+5000 was emitted. It's because the append output
            // mode emits the results once a new watermark is defined and the accumulated results are below
            // the new threshold
            readValues should have size 2
            readValues should contain allOf("1970-01-01 01:00:09.0 -> 2", "1970-01-01 01:00:10.0 -> 3")
          }
    
          "the count on non-watermark column" should "fail in append mode" in {
            val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext)
            val aggregatedStream = inputStream.toDS().toDF("created", "number")
              .withWatermark("created", "1 second")
              .groupBy("number")
              .count()
    
            val exception = intercept[AnalysisException]{
              aggregatedStream.writeStream.outputMode("append")
                .foreach(new NoopForeachWriter[Row]()).start()
            }
    
            exception.message should include("Append output mode not supported when there are streaming aggregations on " +
              "streaming DataFrames/DataSets without watermark")
          }
    
          "the count on watermark column" should "be correctly computed in update mode" in {
            val testKey = "update-output-mode-with-watermark-aggregation"
            val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext)
            val now = 5000L
            val aggregatedStream = inputStream.toDS().toDF("created", "number")
              .withWatermark("created", "1 second")
              .groupBy("created")
              .count()
    
            val query = aggregatedStream.writeStream.outputMode("update")
              .foreach(new InMemoryStoreWriter[Row](testKey,
                (processedRow) => s"${processedRow.getAs[Long]("created")} -> ${processedRow.getAs[Long]("count")}")).start()
    
            new Thread(new Runnable() {
              override def run(): Unit = {
                inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+5000), 3),
                  (new Timestamp(now+5000), 4))
                while (!query.isActive) {}
                Thread.sleep(10000)
                inputStream.addData((new Timestamp(4000L), 5))
                inputStream.addData((new Timestamp(now), 6), (new Timestamp(11000), 7))
              }
            }).start()
    
            query.awaitTermination(45000)
    
            val readValues = InMemoryKeyedStore.getValues(testKey)
            readValues should have size 2
            readValues should contain allOf("1970-01-01 01:00:10.0 -> 4", "1970-01-01 01:00:11.0 -> 1")
          }
    
          "the count on non-watermark column" should "be correctly computed in update mode" in {
            val testKey = "update-output-mode-without-watermark-aggregation"
            val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext)
            val now = 5000L
            val aggregatedStream = inputStream.toDS().toDF("created", "number")
              .withWatermark("created", "1 second")
              .groupBy("number")
              .count()
    
            val query = aggregatedStream.writeStream.outputMode("update")
              .foreach(new InMemoryStoreWriter[Row](testKey,
                (processedRow) => s"${processedRow.getAs[Long]("number")} -> ${processedRow.getAs[Long]("count")}")).start()
    
            new Thread(new Runnable() {
              override def run(): Unit = {
                inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+5000), 3),
                  (new Timestamp(now+5000), 3))
                while (!query.isActive) {}
                Thread.sleep(10000)
                inputStream.addData((new Timestamp(4000L), 6))
                inputStream.addData((new Timestamp(now), 6), (new Timestamp(11000), 7))
              }
            }).start()
    
            query.awaitTermination(45000)
    
            val readValues = InMemoryKeyedStore.getValues(testKey)
            readValues should have size 5
            readValues should contain allOf("1 -> 1", "3 -> 2", "2 -> 1", "6 -> 2", "7 -> 1")
          }
    
    
          "the count on watermark column" should "be correctly computed in complete mode" in {
            val testKey = "update-output-mode-with-watermark-aggregation"
            val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext)
            val now = 5000L
            val aggregatedStream = inputStream.toDS().toDF("created", "number")
              .withWatermark("created", "1 second")
              .groupBy("created")
              .count()
    
            val query = aggregatedStream.writeStream.outputMode("complete")
              .foreach(new InMemoryStoreWriter[Row](testKey,
                (processedRow) => s"${processedRow.getAs[Long]("created")} -> ${processedRow.getAs[Long]("count")}")).start()
    
            new Thread(new Runnable() {
              override def run(): Unit = {
                inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+5000), 3),
                  (new Timestamp(now+5000), 4))
                while (!query.isActive) {}
                Thread.sleep(10000)
                inputStream.addData((new Timestamp(4000L), 5))
                inputStream.addData((new Timestamp(now), 6), (new Timestamp(11000), 7))
              }
            }).start()
    
            query.awaitTermination(45000)
    
            val readValues = InMemoryKeyedStore.getValues(testKey)
            println(s"${readValues}")
            readValues should have size 5
            readValues.sorted should equal (Seq("1970-01-01 01:00:04.0 -> 1", "1970-01-01 01:00:05.0 -> 1",
              "1970-01-01 01:00:10.0 -> 4", "1970-01-01 01:00:10.0 -> 4", "1970-01-01 01:00:11.0 -> 1"))
          }
    
          "the count on non-watermark column" should "be correctly computed in complete mode" in {
            val testKey = "update-output-mode-with-watermark-aggregation"
            val inputStream = new MemoryStream[(Timestamp, Int)](1, sparkSession.sqlContext)
            val now = 5000L
            val aggregatedStream = inputStream.toDS().toDF("created", "number")
              .withWatermark("created", "1 second")
              .groupBy("number")
              .count()
    
            val query = aggregatedStream.writeStream.outputMode("complete")
              .foreach(new InMemoryStoreWriter[Row](testKey,
                (processedRow) => s"${processedRow.getAs[Long]("number")} -> ${processedRow.getAs[Long]("count")}")).start()
    
            new Thread(new Runnable() {
              override def run(): Unit = {
                inputStream.addData((new Timestamp(now+5000), 1), (new Timestamp(now+5000), 2), (new Timestamp(now+5000), 3),
                  (new Timestamp(now+5000), 4))
                while (!query.isActive) {}
                Thread.sleep(10000)
                inputStream.addData((new Timestamp(4000L), 5))
                inputStream.addData((new Timestamp(now), 6), (new Timestamp(11000), 7))
              }
            }).start()
    
            query.awaitTermination(45000)
    
            val readValues = InMemoryKeyedStore.getValues(testKey)
            println(s"${readValues}")
            readValues should have size 11
            readValues.sorted should equal (Seq("1 -> 1", "1 -> 1", "2 -> 1", "2 -> 1",
              "3 -> 1", "3 -> 1", "4 -> 1", "4 -> 1", "5 -> 1", "6 -> 1", "7 -> 1"))
          }
        
  • remaining aggregations (no watermark)
    • append mode : not supported - the aggregations can be updated at any moment and it violates the principles of this mode
    • complete mode: supported
    • update mode : supported - but unlike told in the previous point, the old aggregates are not cleaned. The engine doesn't know when the aggregates are too old.

    The tests:
        "append mode without aggregation" should "be disallow in structured streaming without watermark" in {
          val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
          val counter =inputStream.toDS().toDF("id", "name").agg(count("*"))
          inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
    
          val exception = intercept[AnalysisException] {
            counter.writeStream.outputMode("append").foreach(new NoopForeachWriter()).start()
          }
          exception.message should include ("Append output mode not supported when there are streaming " +
            "aggregations on streaming DataFrames/DataSets without watermark")
        }
    
        "complete mode without aggregation" should "be allowed in structured streaming without watermark" in {
          val testKey = "no-aggregation-complete-output-mode"
          val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
          val counter =inputStream.toDS().toDF("id", "name").groupBy("name").agg(count("*"))
          inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
          inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
          inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
    
          val query = counter.writeStream.outputMode("complete").foreach(new ForeachWriter[Row] {
            override def open(partitionId: Long, version: Long): Boolean = true
    
            override def process(row: Row): Unit = {
              println(s"processing ${row}")
              InMemoryKeyedStore.addValue(testKey, s"${row.getAs[String]("name")}=${row.getAs[Long]("count(1)")}")
            }
    
            override def close(errorOrNull: Throwable): Unit = {}
          }).start()
    
          new Thread(new Runnable() {
            override def run(): Unit = {
              Thread.sleep(10000)
              // Add only 1  row to show that all entries are sent to the result table every time
              inputStream.addData((1L, "test1"))
            }
          }).start()
    
          query.awaitTermination(30000)
    
          val processedRows = InMemoryKeyedStore.getValues(testKey)
          processedRows should have size 6
          val rowsTest1 = processedRows.filter(_.startsWith("test1="))
          rowsTest1 should have size 2
          rowsTest1 should contain allOf("test1=3", "test1=4")
          val rowsTest2 = processedRows.filter(_.startsWith("test2="))
          rowsTest2 should have size 2
          rowsTest2 should contain only("test2=3")
          val rowsTest3 = processedRows.filter(_.startsWith("test3="))
          rowsTest3 should have size 2
          rowsTest3 should contain only("test3=3")
        }
    
        "update mode without aggregation" should "be allowed in structured streaming without watermark" in {
          val testKey = "no-aggregation-update-output-mode"
          val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
          val counter =inputStream.toDS().toDF("id", "name").groupBy("name").agg(count("*"))
          inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
          inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
          inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
    
          val query = counter.writeStream.outputMode("update").foreach(new InMemoryStoreWriter[Row](testKey,
            (processedRow) => s"${processedRow.getAs[String]("name")} -> ${processedRow.getAs[Long]("count(1)")}")).start()
    
          new Thread(new Runnable() {
            override def run(): Unit = {
              Thread.sleep(10000)
              // Add only 1  row to show that only updated row is sent to the result table
              inputStream.addData((1L, "test1"))
            }
          }).start()
    
          query.awaitTermination(30000)
    
          val processedRows = InMemoryKeyedStore.getValues(testKey)
          processedRows should have size 4
          processedRows should contain allOf("test1 -> 3", "test1 -> 4", "test2 -> 3", "test3 -> 3")
        }
        
  • mapGroupsWithState processing
    • append mode: not supported
    • complete mode: not supported
    • update mode: supported

    The tests:
          private val MappingFunction: (Long, Iterator[Row], GroupState[Seq[String]]) => Seq[String] = (key, values, state) => {
            val stateNames = state.getOption.getOrElse(Seq.empty)
            val stateNewNames = stateNames ++ values.map(row => row.getAs[String]("name"))
            state.update(stateNewNames)
            stateNewNames
          }
    
          "append mode" should "not work for mapGroupWithState" in {
            val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
            val mappedValues =inputStream.toDS().toDF("id", "name")
              .groupByKey(row => row.getAs[Long]("id"))
              .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(MappingFunction)
            inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3") )
    
            val exception = intercept[AnalysisException] {
              mappedValues.writeStream.outputMode("append").foreach(new NoopForeachWriter[Seq[String]]()).start()
            }
    
            exception.message should include("mapGroupsWithState is not supported with Append output mode on a " +
              "streaming DataFrame/Dataset")
          }
    
          "complete mode" should "not work for mapGroupWithState" in {
            val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
            val mappedValues =inputStream.toDS().toDF("id", "name")
              .groupByKey(row => row.getAs[Long]("id"))
              .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(MappingFunction)
            inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3") )
    
            val exception = intercept[AnalysisException] {
              mappedValues.writeStream.outputMode("complete").foreach(new NoopForeachWriter[Seq[String]]()).start()
            }
    
            exception.message should include("Complete output mode not supported when there are no streaming " +
              "aggregations on streaming DataFrames/Datasets")
          }
    
          "update mode" should "work for mapGroupWithState" in {
            val testKey = "mapGroupWithState-update-output-mode"
            val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
            val mappedValues =inputStream.toDS().toDF("id", "name")
              .groupByKey(row => row.getAs[Long]("id"))
              .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(MappingFunction)
            inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20"), (3L, "test30"))
    
            val query = mappedValues.writeStream.outputMode("update")
              .foreach(new InMemoryStoreWriter[Seq[String]](testKey, (stateSeq) => stateSeq.mkString(","))).start()
    
            new Thread(new Runnable() {
              override def run(): Unit = {
                while (!query.isActive) {}
                Thread.sleep(5000)
                inputStream.addData((1L, "test12"), (1L, "test13"), (2L, "test21"))
              }
            }).start()
    
            query.awaitTermination(30000)
    
            val savedValues = InMemoryKeyedStore.getValues(testKey)
            savedValues should have size 5
            savedValues should contain allOf("test30", "test10,test11", "test20", "test10,test11,test12,test13",
              "test20,test21")
          }
        
  • flatMapGroupsWithState
    • append mode: supported. Moreover, the aggregations are allowed after flatMapGroupsWithState. In order to work, one condition must be met. The watermark must be defined before the aggregation declared after flatMapGroupsWithState transformation.
    • complete mode: not supported
    • update mode: supported but the aggregations are not allowed after flatMapGroupsWithState

    The tests:
        private val MappingFunction: (Long, Iterator[Row], GroupState[Seq[String]]) => Iterator[String] = (key, values, state) => {
          val stateNames = state.getOption.getOrElse(Seq.empty)
          //state.setTimeoutDuration(1000L)
          val stateNewNames = stateNames ++ values.map(row => row.getAs[String]("name"))
          state.update(stateNewNames)
          Iterator(stateNewNames.mkString(","))
        }
    
        "multiple flatMapGroupsWithState" should "fail because they're not all in append mode" in {
          val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
          val flattenResults = inputStream.toDS().toDF("id", "name")
            .groupByKey(row => row.getAs[Long]("id"))
            .flatMapGroupsWithState(outputMode = OutputMode.Append(), timeoutConf = NoTimeout())(MappingFunction)
            .groupByKey(key => key)
            .flatMapGroupsWithState(outputMode = OutputMode.Update(),
              timeoutConf = NoTimeout())((key, values, state: GroupState[String]) => Iterator(""))
          inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
    
          val exception = intercept[AnalysisException] {
            flattenResults.writeStream.outputMode("complete").foreach(new NoopForeachWriter[String]()).start()
          }
    
          exception.message should include("Multiple flatMapGroupsWithStates are not supported when they are not " +
            "all in append mode or the output mode is not append on a streaming DataFrames/Datasets")
        }
    
        "flatMapGroupsWithState" should "fail for complete mode" in {
          val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
    
          val exception = intercept[IllegalArgumentException] {
            inputStream.toDS().toDF("id", "name")
              .groupByKey(row => row.getAs[Long]("id"))
              .flatMapGroupsWithState(outputMode = OutputMode.Complete(),
                timeoutConf = NoTimeout())(MappingFunction)
          }
    
          exception.getMessage should include("The output mode of function should be append or update")
        }
    
        "flatMapGroupsWithState" should "fail for append mode in flatMap and update mode in sink" in {
          val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
          val flattenResults = inputStream.toDS().toDF("id", "name")
            .groupByKey(row => row.getAs[Long]("id"))
            .flatMapGroupsWithState(outputMode = OutputMode.Append(),
              timeoutConf = NoTimeout())(MappingFunction)
          inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
    
          val exception = intercept[AnalysisException] {
            flattenResults.writeStream.outputMode("update").foreach(new NoopForeachWriter[String]()).start()
          }
    
          exception.message should include("flatMapGroupsWithState in append mode is not supported with Update output " +
            "mode on a streaming DataFrame/Dataset")
        }
    
        "flatMapGroupsWithState" should "work for append mode in flatMap and sink" in {
          val testKey = "flatMapGroupsWithState-append-mode"
          val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
          val flattenResults = inputStream.toDS().toDF("id", "name")
            .groupByKey(row => row.getAs[Long]("id"))
            .flatMapGroupsWithState(outputMode = OutputMode.Append(),
              timeoutConf = NoTimeout())(MappingFunction)
          inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20"))
    
          val query = flattenResults.writeStream.outputMode("append").foreach(
            new InMemoryStoreWriter[String](testKey, (state) => state)).start()
          new Thread(new Runnable() {
            override def run(): Unit = {
              while (!query.isActive) {}
              Thread.sleep(5000)
              inputStream.addData((1L, "test12"))
              inputStream.addData((1L, "test13"))
            }
          }).start()
    
          query.awaitTermination(45000)
          val savedValues = InMemoryKeyedStore.getValues(testKey)
          savedValues should have size 3
          savedValues should contain allOf("test20", "test10,test11", "test10,test11,test12,test13")
        }
    
        "flatMapGroupsWithState" should "fail for append mode in flatMap and complete mode in sink" in {
          val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
          val flattenResults = inputStream.toDS().toDF("id", "name")
            .groupByKey(row => row.getAs[Long]("id"))
            .flatMapGroupsWithState(outputMode = OutputMode.Append(),
              timeoutConf = NoTimeout())(MappingFunction)
          inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
    
          val exception = intercept[AnalysisException] {
            flattenResults.writeStream.outputMode("complete").foreach(new NoopForeachWriter[String]()).start()
          }
    
          exception.message should include("Complete output mode not supported when there are no streaming aggregations " +
            "on streaming DataFrames/Datasets")
        }
    
        "flatMapGroupsWithState with aggregation after mapping" should "fail for append because of missing watermark" in {
          val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
          val flattenResults = inputStream.toDS().toDF("id", "name")
            .groupByKey(row => row.getAs[Long]("id"))
            .flatMapGroupsWithState(outputMode = OutputMode.Append(),
              timeoutConf = NoTimeout())(MappingFunction)
            .agg(count("*").as("count"))
          inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20"))
    
          val exception = intercept[AnalysisException] {
            flattenResults.writeStream.outputMode("append").foreach(new NoopForeachWriter[Row]).start()
          }
    
          exception.getMessage() should include("Append output mode not supported when there are streaming " +
            "aggregations on streaming DataFrames/DataSets without watermark")
        }
    
        "flatMapGroupsWithState with aggregation after mapping" should "succeed for append when watermark is defined" in {
          val inputStream = new MemoryStream[(Timestamp, Long, String)](1, sparkSession.sqlContext)
          val now = 5000
          val flattenResults = inputStream.toDS().toDF("created", "id", "name")
            .groupByKey(row => row.getAs[Timestamp]("id"))
            .flatMapGroupsWithState(outputMode = OutputMode.Append(),
              timeoutConf = NoTimeout())((key, values, state: GroupState[(Timestamp, String)]) => Iterator((new Timestamp(1000), "")))
            .toDF("created", "name")
            .withWatermark("created", "1 second")
            .groupBy("created")
            .agg(count("*"))
          inputStream.addData((new Timestamp(now), 1L, "test10"), (new Timestamp(now), 1L, "test11"),
            (new Timestamp(now), 2L, "test20"))
    
          flattenResults.writeStream.outputMode("append").foreach(new NoopForeachWriter[Row]).start()
        }
    
        "flatMapGroupsWithState with aggregation after mapping" should "fail for append when watermark is not defined" in {
          val inputStream = new MemoryStream[(Timestamp, Long, String)](1, sparkSession.sqlContext)
          val now = 5000
          val flattenResults = inputStream.toDS().toDF("created", "id", "name")
            .groupByKey(row => row.getAs[Timestamp]("id"))
            .flatMapGroupsWithState(outputMode = OutputMode.Append(),
              timeoutConf = NoTimeout())((key, values, state: GroupState[(Timestamp, String)]) => Iterator((new Timestamp(1000), "")))
            .toDF("created", "name")
            .groupBy("created")
            .agg(count("*"))
          inputStream.addData((new Timestamp(now), 1L, "test10"), (new Timestamp(now), 1L, "test11"),
            (new Timestamp(now), 2L, "test20"))
    
          val exception = intercept[AnalysisException] {
            flattenResults.writeStream.outputMode("append").foreach(new NoopForeachWriter[Row]).start()
          }
    
          exception.getMessage() should include("Append output mode not supported when there are streaming aggregations on " +
            "streaming DataFrames/DataSets without watermark")
        }
    
        "flatMapGroupsWithState with aggregation after" should "fail for update mode" in {
          val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
          val flattenResults = inputStream.toDS().toDF("id", "name")
            .groupByKey(row => row.getAs[Long]("id"))
            .flatMapGroupsWithState(outputMode = OutputMode.Update(),
            timeoutConf = NoTimeout())(MappingFunction)
            .agg(count("*"))
          inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
    
          val exception = intercept[AnalysisException] {
            flattenResults.writeStream.outputMode("update").foreach(new NoopForeachWriter[Row]()).start()
          }
    
          exception.message should include("flatMapGroupsWithState in update mode is not supported with " +
            "aggregation on a streaming DataFrame/Dataset")
        }
    
        "flatMapGroupsWithState without aggregation" should "be correctly executed in update mode" in {
          val testKey = "flatMapGroupsWithState-no-aggregation"
          val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
          val flattenResults = inputStream.toDS().toDF("id", "name")
            .groupByKey(row => row.getAs[Long]("id"))
            .flatMapGroupsWithState(outputMode = OutputMode.Update(),
              timeoutConf = NoTimeout())(MappingFunction)
          inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20"))
    
          val query = flattenResults.writeStream.outputMode("update")
            .foreach(new InMemoryStoreWriter[String](testKey, (state) => state)).start()
    
          new Thread(new Runnable() {
            override def run(): Unit = {
              while (!query.isActive) {}
              Thread.sleep(5000)
              inputStream.addData((1L, "test12"))
              inputStream.addData((1L, "test13"))
            }
          }).start()
          query.awaitTermination(40000)
    
          val savedValues = InMemoryKeyedStore.getValues(testKey)
          savedValues should have size 3
          savedValues should contain allOf("test20", "test10,test11", "test10,test11,test12,test13")
        }
        
  • remaining processing types
    • append mode: supported
    • complete mode: not supported - it's very hard to keep the whole data in the result table
    • update mode: supported

    The tests:
        "append" should "work for map transform" in {
          val testKey = "other-processing-append-output-mode"
          val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
          val mappedResult = inputStream.toDS().toDF("id", "name")
            .map(row => row.getAs[String]("name"))
          inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20"))
    
          val query = mappedResult.writeStream.outputMode("append")
            .foreach(new InMemoryStoreWriter[String](testKey, (mappedValue) => mappedValue)).start()
          new Thread(new Runnable() {
            override def run(): Unit = {
              while (!query.isActive) {}
              Thread.sleep(5000)
              inputStream.addData((1L, "test12"))
              inputStream.addData((1L, "test13"))
            }
          }).start()
          query.awaitTermination(30000)
    
          val processedValues = InMemoryKeyedStore.getValues(testKey)
          processedValues should have size 5
          processedValues should contain allOf("test10", "test11", "test12", "test13", "test20")
    
        }
    
        "update" should "work for map transform" in {
          val testKey = "other-processing-update-output-mode"
          val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
          val mappedResult = inputStream.toDS().toDF("id", "name")
            .map(row => row.getAs[String]("name"))
          inputStream.addData((1L, "test10"), (1L, "test11"), (2L, "test20"))
    
          val query = mappedResult.writeStream.outputMode("update")
            .foreach(new InMemoryStoreWriter[String](testKey, (mappedValue) => mappedValue)).start()
          new Thread(new Runnable() {
            override def run(): Unit = {
              while (!query.isActive) {}
              Thread.sleep(5000)
              inputStream.addData((1L, "test12"))
              inputStream.addData((1L, "test13"))
            }
          }).start()
          query.awaitTermination(30000)
    
          val processedValues = InMemoryKeyedStore.getValues(testKey)
          processedValues should have size 5
          processedValues should contain allOf("test10", "test11", "test12", "test13", "test20")
        }
    
        "complete mode" should "not be accepted in mapping transform" in {
          val inputStream = new MemoryStream[(Long, String)](1, sparkSession.sqlContext)
          val mappedResults = inputStream.toDS().toDF("id", "name")
            .map(row => row.getAs[String]("name"))
          inputStream.addData((1L, "test1"), (1L, "test2"), (2L, "test3"))
    
          val exception = intercept[AnalysisException] {
            mappedResults.writeStream.outputMode("complete").foreach(new NoopForeachWriter[String]()).start()
          }
    
          exception.message should include("Complete output mode not supported when there are no streaming aggregations " +
            "on streaming DataFrames/Datasets")
        }
        

The output modes in Apache Spark determines how the output is generated. Among 3 different strategies, one of them returns always the complete result while 2 others either appends the results that are not supposed to receive the data anymore or update already computed results. All of these main behaviors were shown in the tests defined in the 3rd section.

Read also about Output modes in Apache Spark Structured Streaming here: Output Modes , OutputMode .

Share, like or comment this post on Twitter: