What's new in Apache Spark 3.2.0 - Structured Streaming

Versions: Apache Spark 3.2.0

After previous blog posts focusing on 2 specific Structured Streaming features, it's time to complete them with a list of other changes made in the 3.2.0 version!

Apache Kafka

Let's begin with some exciting news about Apache Kafka. Among new features you can now define the start and end timestamps for all partitions of a topic. This new setting can be useful for data reprocessing scenarios when often the backfilling should start at the same time for all partitions. The new options are called startingTimestamp and endingTimestamp.

The author of the previous change, Jungtaek Lim is also the author of the second feature tending to fix the behavior on mismatched offsets for start offset timestamp. Previously, when Apache Spark didn't find the record matching the user-defined starting offset, the job failed. Starting from 3.2.0 it's possible to configure this behavior and use the latest available offset instead of stopping the execution. The error remains the default behavior, though.

The 3rd Apache Kafka feature was added by Satish Gopalani and the goal was to skip empty batches execution with 2 new configuration options: minOffsetsPerTrigger and maxTriggerDelay options. When both are set, Apache Spark will skip the micro-batch execution for at most maxTriggerDelay if the number of records to process is lower than the minOffsetsPertrigger.

Finally, several news on the monitoring side. To start, Liang-Chi Hsieh added some extra metrics to help tracking the data loss. The first of them is called offsetOutOfRange and tracks the number of offsets out of range. The second one is dataLoss and it stores the number of data loss events. To complete the work, Yijia Cui added the information about the processing latency (minOffsetsBehindLatest, maxOffsetsBehindLatest, avgOffsetsBehindLatest)

You can find all these features in the demo just below:

Initial state in arbitrary stateful processing

The first non-Kafka related feature from the list concerns the arbitrary stateful processing and was brought to the community by Rahul Shivu Mahadev. Thanks to this shiny new feature it is now possible to define an initial state that will be loaded by Apache Spark while running the first micro-batch. The check is made here, while planning the physical execution:

class IncrementalExecution
// ...
      case m: FlatMapGroupsWithStateExec =>
        // We set this to true only for the first batch of the streaming query.
        val hasInitialState = (currentBatchId == 0L && m.hasInitialState)
        m.copy(
          stateInfo = Some(nextStatefulOperationStateInfo),
          batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
          eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs),
          hasInitialState = hasInitialState
        )

Under-the-hood the operation uses the StateStoreAwareZipPartitionsRDD which joins the partitions of two composing RDDs. Inside, the join consists on using CoGroupedIterator to return the mixed (initial state + input row) or separate rows (initial state OR input row). That's the reason why you will see your stateful function called even for the rows without the input data. The iteration also contains a guard against duplicate keys in the default

case class FlatMapGroupsWithStateExec
// ...
new CoGroupedIterator(
          groupedChildDataIter, groupedInitialStateIter, groupingAttributes).flatMap {
        case (keyRow, valueRowIter, initialStateRowIter) =>
          val keyUnsafeRow = keyRow.asInstanceOf[UnsafeRow]
          var foundInitialStateForKey = false
          initialStateRowIter.foreach { initialStateRow =>
            if (foundInitialStateForKey) {
              FlatMapGroupsWithStateExec.foundDuplicateInitialKeyException()
            }
            foundInitialStateForKey = true
            val initStateObj = getStateObj.get(initialStateRow)
            stateManager.putState(store, keyUnsafeRow, initStateObj, NO_TIMESTAMP)
          }
          // We apply the values for the key after applying the initial state.
          callFunctionAndUpdateState(
            stateManager.getState(store, keyUnsafeRow),
              valueRowIter,
              hasTimedOut = false
          )
      }
  def foundDuplicateInitialKeyException(): Exception = {
    throw new IllegalArgumentException("The initial state provided contained " +
      "multiple rows(state) with the same key. Make sure to de-duplicate the " +
      "initial state before passing it.")
  }

As you can notice, there is a check to protect against duplicate keys in the initial state dataset. Below you can find an example of initializing the state:

  val defaultState = Seq(
    ("user1", 10), ("user2", 20), ("user3", 30)
  ).toDF("login", "points").as[(String, Int)].groupByKey(row => row._1).mapValues(_._2)

  val inputStream = new MemoryStream[(String, Int)](1, sparkSession.sqlContext)
  inputStream.addData(("user1", 5))
  inputStream.addData(("user4", 2))

  val statefulAggregation = inputStream.toDS().toDF("login", "points")
    .groupByKey(row => row.getAs[String]("login"))
    .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout(), defaultState)(StatefulMapper.apply)

Misc

Among other changes you will find in Structured Streaming:

With this blog post I end the exploration of Apache Spark Structured Streaming 3.2.0 features. Next week you'll read other exciting features in the most recent version of the framework!