What's new in Apache Spark 3.3.0 - Structured Streaming

Versions: Apache Spark 3.3.0

Even though the Project Lightspeed is not there yet, Apache Spark Structured Streaming 3.3.0 has several interesting features that should make your daily life easier.

New ebook 🔥

Learn 84 ways to solve common data engineering problems with cloud services.

👉 I want my Early Access edition

Trigger.Once in multiple batches

The first important feature is a new trigger called AvailableNow. It sounds mysterious but if we compare it with the Once trigger, you should get the purpose immediately:

How does it work? Besides the new trigger, Apache Spark 3.3.0 has a new interface called SupportsTriggerAvailableNow. Any source supporting the trigger must implement it to return the data available for processing within the trigger scope. Apache Kafka returns the last available offset:

private[kafka010] class KafkaMicroBatchStream
// ...
override def prepareForTriggerAvailableNow(): Unit = {
  allDataForTriggerAvailableNow = kafkaOffsetReader.fetchLatestOffsets(
    Some(getOrCreateInitialPartitionOffsets()))
}

Next, the MicroBatchExecution running the streaming job uses a MultiBatchExecutor that remains active as long as there is data left to process:

case class MultiBatchExecutor() extends TriggerExecutor {
  override def execute(batchRunner: () => Boolean): Unit = while (batchRunner()) {}
}

The difference with the Trigger.Once is visible pretty clearly if you compare the MultiBatchExecutor with SingheBatchExecutor:

case class SingleBatchExecutor() extends TriggerExecutor {
  override def execute(batchRunner: () => Boolean): Unit = batchRunner()
}

And why does Trigger.Once ignore the rate limits? Because of this condition applied while planning the Structured Streaming job:

class MicroBatchExecution(
// ...
override lazy val logicalPlan: LogicalPlan = {
// ...
uniqueSources = triggerExecutor match {
  case _: SingleBatchExecutor =>
    sources.distinct.map {
      case s: SupportsAdmissionControl =>
        val limit = s.getDefaultReadLimit
        if (limit != ReadLimit.allAvailable()) {
          logWarning(
            s"The read limit $limit for $s is ignored when Trigger.Once is used.")
        }
        s -> ReadLimit.allAvailable()
      case s =>
        s -> ReadLimit.allAvailable()
    }.toMap

Rate per micro-batch source

Another high-level change concerns a new data source called rate micro-batch. Why this new input looking similar at first glance to the rate data source? Jungtaek Lim, who is the author of the feature, explained that in the JIRA ticket:

The "rate" data source has been known to be used as a benchmark for streaming query.

While this helps to put the query to the limit (how many rows the query could process per second), the rate data source doesn't provide consistent rows per batch into stream, which leads two environments be hard to compare with.

To solve the issue, Jungtaek added a new data source that:

Do you want to see the difference with the classical rate source? Watch the fragment of the video below:

RocksDB improvements

RocksDB, which is the state store added in the 3.2.0 release, also improved in the most recent version of Apache Spark. First, Jungtaek Lim added a new configuration entry to control the tracking of the writing operations. The feature addresses the double lookup on the key for the write operations made by RocksDB. If you prefer the performance over the observability, you can set false to the spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows. Unfortunately, you can't have both.

Another change related to the monitoring is a bug fix for the RocksDB memory usage in the state store's metrics. Before, the metric missed the block-cache-usage data which is was included by Yun Tang in the 3.3.0 release.

Lastly, Alexander Balikov and Jungtaek Lim solved a correctness issue for stream-to-stream joins and arbitrary stateful processing. Why? In the previous version the state store used the same iterator under-the-hood for both update and delete steps. In consequence, the rows updated in the micro-batch that were about to expire, were removed in the same micro-batch. Now, the delete phase works on the updated iterator. An example of the issue in the following picture:

Distribution and ordering for DataSource V2 writes

In addition to the RocksDB improvements, Apache Spark 3.3.0 also includes some catch-up for the RequiresDistributionAndOrdering interface I described in the What's new in Apache Spark 3.2.0 - Data Source V2 article.

As a reminder, this interface is present when the data source requires a specific ordering or partitioning on the writing side. So far, it only has been implemented for the batch workloads. The 3.3.0 release extends the scope to the streaming writers with this addition to the V2Writes logical rule:

object V2Writes extends Rule[LogicalPlan] with PredicateHelper {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
// ...
case WriteToMicroBatchDataSource(
    relation, table, query, queryId, writeOptions, outputMode, Some(batchId)) =>

  val writeBuilder = newWriteBuilder(table, writeOptions, query.schema, queryId)
  val write = buildWriteForMicroBatch(table, writeBuilder, outputMode)
  val microBatchWrite = new MicroBatchWrite(batchId, write.toStreaming)
  val customMetrics = write.supportedCustomMetrics.toSeq
  val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query, conf)
  WriteToDataSourceV2(relation, microBatchWrite, newQuery, customMetrics)
// ...

The highlighted line is the method where Apache Spark adds an extra RepartitionByExpression and Sort node if any of them is required by the data source.

Structured Streaming has several interesting changes in the 3.3.0 release. But other Apache Spark modules too and we'll try to see it in the next parts of the series!