Output modes in Structured Streaming

Versions: Apache Spark 2.4.2

The series of notes I took during my Apache Spark Summit preparation continues. Today it's time to cover output modes that I also used in the presented solution for sessionization problem.

As for previous posts, in this one I will also use the Q&A mode and try to go from general to more specific concepts.

Outline:

What are 3 modes?

Output mode defines what data will be written to the streaming sink. The 3 existent output modes are:

What is the difference with SaveMode?

My first impression was that output mode is a streaming version for batch save modes. But after analyzing both concepts closer, I saw how I was wrong. Save modes (SaveMode enum) defines how Apache Spark should behave when the sink location already exists. And you can say to overwrite the old data with the new one, to keep old data, to fail the application or simply to add new data to the old one.

What is the relation between state and output mode?

The output mode is responsible for figuring out what data should be passed to the sink. But it also has an impact on the state store and will define the rows that will be written to it in every query.

Under-the-hood you can find the proofs of this behavior in StateStoreSaveExec calling StateStoreOps#mapPartitionsWithStateStore with (storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]) parameter corresponding to the output mode:

outputMode match {
  // Update and output all rows in the StateStore.
  case Some(Complete) =>
    // …
              while (iter.hasNext) {
                val row = iter.next().asInstanceOf[UnsafeRow]
                stateManager.put(store, row)
                numUpdatedStateRows += 1
              }
    stateManager.values(store).map { valueRow =>
      numOutputRows += 1
      valueRow
    }
  // Update and output only rows being evicted from the StateStore
  // Assumption: watermark predicates must be non-empty if append mode is allowed
  case Some(Append) =>
    val filteredIter = iter.filter(row => !watermarkPredicateForData.get.eval(row))
    while (filteredIter.hasNext) {
      val row = filteredIter.next().asInstanceOf[UnsafeRow]
      stateManager.put(store, row)
      numUpdatedStateRows += 1
    }
  }
  // ...
  new NextIterator[InternalRow] {
    override protected def getNext(): InternalRow = {
      var removedValueRow: InternalRow = null
      while(rangeIter.hasNext && removedValueRow == null) {
        val rowPair = rangeIter.next()
        if (watermarkPredicateForKeys.get.eval(rowPair.key)) {
          stateManager.remove(store, rowPair.key)
          removedValueRow = rowPair.value
        }
      }
      if (removedValueRow == null) {
        finished = true
        null
      } else {
        removedValueRow
      }
    }
    // Update and output modified rows from the StateStore.
    case Some(Update) =>

      new NextIterator[InternalRow] {
        // Filter late date using watermark if specified
        private[this] val baseIterator = watermarkPredicateForData match {
          case Some(predicate) => iter.filter((row: InternalRow) => !predicate.eval(row))
          case None => iter
        }

        override protected def getNext(): InternalRow = {
          if (baseIterator.hasNext) {
            val row = baseIterator.next().asInstanceOf[UnsafeRow]
            stateManager.put(store, row)
            numOutputRows += 1
            numUpdatedStateRows += 1
            row
          } else {
            finished = true
            null
          }
        }

Quite a lot of code but it confirms my sayings. You can see that complete mode writes all rows to the state store. For the append mode, the not expired state is written to the state store whereas the expired one is removed from it and returned to the sink. And finally, the update mode first returns the modified states and later removes the expired ones from the state store. For Append mode you can see that the "new" applies to the state and a row is considered as new only when it's expired, ie. it's not supposed to change anymore.

However, it's not StateStoreSaveExec which will be called in the case of *withState methods. StateStoreSaveExec is the node working only for streaming aggregations. The node responsible for the physical execution of *withState operation will be FlatMapGroupsWithStateExec. In that case the function updating state store will take all group states, including the expired ones:

        // Generate a iterator that returns the rows grouped by the grouping function
        // Note that this code ensures that the filtering for timeout occurs only after
        // all the data has been processed. This is to ensure that the timeout information of all
        // the keys with data is updated before they are processed for timeouts.
        val outputIterator =
          processor.processNewData(filteredIter) ++ processor.processTimedOutState()

As a matter of fact, the output mode is completely ignored for *withState transformations except for one thing. The output mode is used to check query semantics. For example, mapGroupsWith state can only be used in Update mode. Otherwise, Apache Spark will return the errors like:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;

And that constraints is enforced inside FlatMapGroupsWithState operator that is used for flat-map and map transformations:

// for mapping
  if (isMapGroupsWithState) {
    assert(outputMode == OutputMode.Update)
  }

// for flat mapping
    if (outputMode != OutputMode.Append && outputMode != OutputMode.Update) {
      throw new IllegalArgumentException("The output mode of function should be append or update")
    }

Why only update works with mapGroupsWithState?

So why only update? I think that it's related to the semantics of mapGroupsWithState. The function is invoked only for the expiring states or the states with new input logs in the current query. Because of that, only the updated state is processed - you won't apply your mapping function to the whole state store.

Initially I thought that the output mode was strictly related to the sink. And that's true but it's only a partial vision. The output mode also applies to the state store. You can see that in the last part of the post the output mode drives the behavior of StateStoreSaveExec physical node responsible for streaming aggregations.


If you liked it, you should read:

đź“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!