What's new in Apache Spark 3 - Structured Streaming

Versions: Apache Spark 3.0.0

Apache Kafka changes in Apache Spark 3.0 was one of the first topics covered in the "what's new" series. Even though there were a lot of changes related to the Kafka source and sink, they're not the single ones in Structured Streaming.

A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free

๐Ÿ‘‰ Register here

The other changes are related to the file source, continuous execution, internal API evolutions, and of course, the bug fixes. In the following 4 sections, you will discover the changes for each of these categories.

File integration

The first major evolution from this file source/sink category is the configuration for the cleanSource option. It defines what happens with the already processed source files and can be set to archive, delete oroff (default).

For the delete configuration, the source will remove already processed files. The removal is managed by SourceFileRemover created, alongside another cleaners, from this factory method:

  private[sql] object FileStreamSourceCleaner {
    def apply(
        fileSystem: FileSystem,
        sourcePath: Path,
        option: FileStreamOptions,
        hadoopConf: Configuration): Option[FileStreamSourceCleaner] = option.cleanSource match {
      case CleanSourceMode.ARCHIVE =>
        require(option.sourceArchiveDir.isDefined)
        val path = new Path(option.sourceArchiveDir.get)
        val archiveFs = path.getFileSystem(hadoopConf)
        val qualifiedArchivePath = archiveFs.makeQualified(path)
        Some(new SourceFileArchiver(fileSystem, sourcePath, archiveFs, qualifiedArchivePath))

      case CleanSourceMode.DELETE =>
        Some(new SourceFileRemover(fileSystem))

      case _ => None
    }
  }

The cleaner is called when the micro-batch correctly processed all files:

  override def commit(end: Offset): Unit = {
    val logOffset = FileStreamSourceOffset(end).logOffset

    sourceCleaner.foreach { cleaner =>
      val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2)
      val validFileEntities = files.filter(_.batchId == logOffset)
      logDebug(s"completed file entries: ${validFileEntities.mkString(",")}")
      validFileEntities.foreach(cleaner.clean)
    }
  }

As already mentioned, the delete strategy will physically remove every processed entry. The "off" configuration will do nothing whereas the "archive" will create an instance of SourceFileArchiver that will move every file to the archive directory specified in sourceArchiveDir property. Below you can find a short demo of this feature:

The second feature from this category is related to the monitoring. If your micro-batch spends more than 2 seconds on compacting the metadata files, you will see a warning coming from the following snippet instead of the debug message:

    if (elapsedMs >= COMPACT_LATENCY_WARN_THRESHOLD_MS) {
      logWarning(s"Compacting took $elapsedMs ms (load: $loadElapsedMs ms," +
        s" write: $writeElapsedMs ms) for compact batch $batchId")
      logWarning(s"Loaded ${allLogs.size} entries (estimated ${SizeEstimator.estimate(allLogs)} " +
        s"bytes in memory), and wrote ${compactedLogs.size} entries for compact batch $batchId")
    } else {
      logDebug(s"Compacting took $elapsedMs ms (load: $loadElapsedMs ms," +
        s" write: $writeElapsedMs ms) for compact batch $batchId")
    }

The third interesting evolution is the sorting improvement for the compaction process and last processed batches retrieval. Before, the operation consisted on sorting first and later reversing the results order:

val compactibleBatchIds = fileManager.list(metadataPath, batchFilesFilter)
      .filter(f => f.getPath.toString.endsWith(CompactibleFileStreamLog.COMPACT_FILE_SUFFIX))
      .map(f => pathToBatchId(f.getPath))
      .sorted
      .reverse

Starting from 3.0, the reverse comparison is made directly with the reverse call:

val compactibleBatchIds = fileManager.list(metadataPath, batchFilesFilter)
      .filter(f => f.getPath.toString.endsWith(CompactibleFileStreamLog.COMPACT_FILE_SUFFIX)) 
      .map(f => pathToBatchId(f.getPath)) 
      .sorted(Ordering.Long.reverse)

// reverse implementation

trait Ordering[T] extends Comparator[T] with PartialOrdering[T] with Serializable {
  outer =>
// ...
  override def reverse: Ordering[T] = new Ordering[T] {
    override def reverse = outer
    def compare(x: T, y: T) = outer.compare(y, x)
  }

Continuous execution

The second category of changes concerns the continuous execution mode. Apache Spark 3 introduced a configuration entry called spark.sql.streaming.continuous.epochBacklogQueueSize to control the number of not terminated epochs. Now, every time the engine receives a CommitPartitionEpoch or ReportPartitionOffset, it invokes the checkProcessingQueueBoundaries function to verify the number of the epochs still in processing, waiting for being committed:

  private def checkProcessingQueueBoundaries() = {
    if (partitionOffsets.size > epochBacklogQueueSize) {
      query.stopInNewThread(new IllegalStateException("Size of the partition offset queue has " +
        "exceeded its maximum"))
    }
    if (partitionCommits.size > epochBacklogQueueSize) {
      query.stopInNewThread(new IllegalStateException("Size of the partition commit queue has " +
        "exceeded its maximum"))
    }
    if (epochsWaitingToBeCommitted.size > epochBacklogQueueSize) {
      query.stopInNewThread(new IllegalStateException("Size of the epoch queue has " +
        "exceeded its maximum"))
    }
  }

Let's see this configuration in action:

Apart from this configuration entry, another important change is a quite important bug fix for Python UDF in continuous execution mode. The issue was related to the threads used in the epoch tracking. Starting from 3.0, an instance of the InheritableThreadLocal class is used instead of ThreadLocal to prevent from losing epoch number when the new thread is created in PySpark:

class ContinuousCoalesceRDD(
    context: SparkContext,
    numPartitions: Int,
    readerQueueSize: Int,
    epochIntervalMs: Long,
    prev: RDD[InternalRow])
  extends RDD[InternalRow](context, Nil) {
// ...
  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
    val part = split.asInstanceOf[ContinuousCoalesceRDDPartition]
// ...
            while (!context.isInterrupted() && !context.isCompleted()) {
              writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]])
              // Note that current epoch is a inheritable thread local but makes another instance,
              // so each writer thread can properly increment its own epoch without affecting
              // the main task thread.
              EpochTracker.incrementCurrentEpoch()
            }
// ...

API changes

In addition to the user-facing changes from the previous sections, Apache Spark 3 also brought some hidden evolutions related to the internal API. If you're a recurrent visitor of the source code, the first change will be very impactful because a lot of @Experimental annotations added at or before the 2.3.0 release, disappeared. Even though it's not a dedicated Structured Streaming change, it's worth mentioning here since there will not be a dedicated post to the annotations changes.

Among other changes, you will have to refactor your code a bit if you're using the processing time trigger. The ProcessingTime class was deleted in the 3.0 release and therefore from now, you should use the Trigger.ProcessingTime.

In addition to these 2 changes, Apache Spark 3 also has a new interface responsible for controlling the number of elements read in every micro-batch execution. This interface is called SupportsAdmissionControl and it exposes 2 methods, getDefaultReadLimit() to return the default limit to be applied on the data source, and latestOffset(Offset startOffset, ReadLimit limit) to return the most recent offset available for processing. You can also notice a presence of ReadLimit interface in the signature of the second method. The role of this interface is to provide the read limit control mechanism, either with the data size (ReadMaxRows for the number of rows, ReadMaxFiles for the number of files in the file source) or all the data available at the ending offset resolution time (ReadAllAvailable).

Bug fixes

In the bug fixes category, there is one major bug fix for the left outer stream-to-stream join. In the 2.4.5 version, the rows from the left side were sometimes returned twice, as you can see in the following snippet where the Batch 1 returns the row "2" when it matches and the Batch 12 when its watermark expires:

-------------------------------------------
Batch: 0
-------------------------------------------
+------+--------+-------+---------+
|leftId|leftTime|rightId|rightTime|
+------+--------+-------+---------+
+------+--------+-------+---------+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----------------------+-------+-----------------------+
|leftId|leftTime               |rightId|rightTime              |
+------+-----------------------+-------+-----------------------+
|2     |2020-09-06 07:14:47.934|2      |2020-09-06 07:14:47.934|
|4     |2020-09-06 07:14:49.934|4      |2020-09-06 07:14:49.934|
|5     |2020-09-06 07:14:50.934|5      |2020-09-06 07:14:50.934|
|0     |2020-09-06 07:14:45.934|0      |2020-09-06 07:14:45.934|
|1     |2020-09-06 07:14:46.934|1      |2020-09-06 07:14:46.934|
|3     |2020-09-06 07:14:48.934|3      |2020-09-06 07:14:48.934|
+------+-----------------------+-------+-----------------------+

-------------------------------------------
Batch: 7
-------------------------------------------
+------+-----------------------+-------+-----------------------+
|leftId|leftTime               |rightId|rightTime              |
+------+-----------------------+-------+-----------------------+
|18    |2020-09-06 07:15:03.934|18     |2020-09-06 07:15:03.934|
|1     |2020-09-06 07:14:46.934|null   |null                   |
|0     |2020-09-06 07:14:45.934|null   |null                   |
+------+-----------------------+-------+-----------------------+

-------------------------------------------
Batch: 8
-------------------------------------------
+------+-----------------------+-------+-----------------------+
|leftId|leftTime               |rightId|rightTime              |
+------+-----------------------+-------+-----------------------+
|19    |2020-09-06 07:15:04.934|19     |2020-09-06 07:15:04.934|
|2     |2020-09-06 07:14:47.934|null   |null                   |
+------+-----------------------+-------+-----------------------+

The reason for that was the join detection which was only based on the keys matching. To fix the issue, a new attribute called matched was added to the join row in the state store. Thanks to it, only the rows not matched previously will be returned at the expiration time:

// StreamingSymmetricHashJoinExec
    val innerOutputIter = CompletionIterator[InternalRow, Iterator[InternalRow]](
      (leftOutputIter ++ rightOutputIter), onInnerOutputCompletion)

    val outputIter: Iterator[InternalRow] = joinType match {
      case LeftOuter =>
        def matchesWithRightSideState(leftKeyValue: UnsafeRowPair) = {
          rightSideJoiner.get(leftKeyValue.key).exists { rightValue =>
            postJoinFilter(joinedRow.withLeft(leftKeyValue.value).withRight(rightValue))
          }
        }
        val removedRowIter = leftSideJoiner.removeOldState()
        val outerOutputIter = removedRowIter.filterNot { kv =>
          stateFormatVersion match {
            case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, kv.value))
            case 2 => kv.matched
            case _ =>
              throw new IllegalStateException("Unexpected state format version! " +
                s"version $stateFormatVersion")
          }
        }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))

        innerOutputIter ++ outerOutputIter

One thing to notice though is that this is a breaking change. It's one of the points included in the Apache Spark Structured Streaming 2.4 โ†’ 3.0 migration guide:


Spark 3.0 fixes the correctness issue on Stream-stream outer join, which changes the schema of state. (See SPARK-26154 for more details). If you start your query from checkpoint constructed from Spark 2.x which uses stream-stream outer join, Spark 3.0 fails the query. To recalculate outputs, discard the checkpoint and replay previous inputs.

The video below demonstrates the issue and the fix:

The second bug fix was for the limit clause in an aggregation query. In the streaming query involving a limit operation followed by an aggregation (eg. inputDataFrame.limit(5).groupBy().count()), the used limit node in 2.4.5 was an instance of GlobalLimit node whereas in 3.0.0 it was replaced by a StreamingGlobalLimit instance:

# Spark 2.4.5
            +- *(1) HashAggregate(keys=[nr#5L], functions=[merge_count(1)], output=[nr#5L, count#20L])
               +- *(1) HashAggregate(keys=[nr#5L], functions=[partial_count(1)], output=[nr#5L, count#20L])
                  +- *(1) GlobalLimit 2
                     +- *(1) LocalLimit 2
                        +- *(1) Project [_1#17L AS nr#5L]
                           +- *(1) Project [_1#17L, _2#18]
                              +- *(1) ScanV2 MemoryStreamDataSource$[_1#17L, _2#18]


# Spark 3.0.0
            +- *(2) HashAggregate(keys=[nr#7L], functions=[merge_count(1)], output=[nr#7L, count#20L])
               +- *(2) HashAggregate(keys=[nr#7L], functions=[partial_count(1)], output=[nr#7L, count#20L])
                  +- StreamingGlobalLimit 2, state info [ checkpoint = file:/tmp/temporary-eff3850b-9d4f-45ab-bde0-cfd595110fb4/state, runId = 83307711-cd00-4684-b0ab-c2ff1f8168a7, opId = 1, ver = 0, numPartitions = 1], Complete
                     +- *(1) LocalLimit 2
                        +- *(1) Project [_1#2L AS nr#7L]
                           +- *(1) Project [_1#2L, _2#3]
                              +- MicroBatchScan[_1#2L, _2#3] MemoryStreamDataSource

For that specific use case, the StreamingGlobalLimit will rely on the state store to return only 2 of the rows to the parent aggregation whereas the GlobalLimit will simply do that in every partition:

  protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
    iter.take(limit)
  }

You can see the difference in the following video:

Even though the Structured Streaming in Apache Spark 3.0 brings less revolutionary changes than the previous releases (continuous execution, streaming joins, ...), it still provides several major improvements, like promoting experimental features to the core ones or fixing errors related to the continuous execution mode and streaming joins.


If you liked it, you should read:

๐Ÿ“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!