After previous blog posts focusing on 2 specific Structured Streaming features, it's time to complete them with a list of other changes made in the 3.2.0 version!
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
Apache Kafka
Let's begin with some exciting news about Apache Kafka. Among new features you can now define the start and end timestamps for all partitions of a topic. This new setting can be useful for data reprocessing scenarios when often the backfilling should start at the same time for all partitions. The new options are called startingTimestamp and endingTimestamp.
The author of the previous change, Jungtaek Lim is also the author of the second feature tending to fix the behavior on mismatched offsets for start offset timestamp. Previously, when Apache Spark didn't find the record matching the user-defined starting offset, the job failed. Starting from 3.2.0 it's possible to configure this behavior and use the latest available offset instead of stopping the execution. The error remains the default behavior, though.
The 3rd Apache Kafka feature was added by Satish Gopalani and the goal was to skip empty batches execution with 2 new configuration options: minOffsetsPerTrigger and maxTriggerDelay options. When both are set, Apache Spark will skip the micro-batch execution for at most maxTriggerDelay if the number of records to process is lower than the minOffsetsPertrigger.
Finally, several news on the monitoring side. To start, Liang-Chi Hsieh added some extra metrics to help tracking the data loss. The first of them is called offsetOutOfRange and tracks the number of offsets out of range. The second one is dataLoss and it stores the number of data loss events. To complete the work, Yijia Cui added the information about the processing latency (minOffsetsBehindLatest, maxOffsetsBehindLatest, avgOffsetsBehindLatest)
You can find all these features in the demo just below:
Initial state in arbitrary stateful processing
The first non-Kafka related feature from the list concerns the arbitrary stateful processing and was brought to the community by Rahul Shivu Mahadev. Thanks to this shiny new feature it is now possible to define an initial state that will be loaded by Apache Spark while running the first micro-batch. The check is made here, while planning the physical execution:
class IncrementalExecution // ... case m: FlatMapGroupsWithStateExec => // We set this to true only for the first batch of the streaming query. val hasInitialState = (currentBatchId == 0L && m.hasInitialState) m.copy( stateInfo = Some(nextStatefulOperationStateInfo), batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs), eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs), hasInitialState = hasInitialState )
Under-the-hood the operation uses the StateStoreAwareZipPartitionsRDD which joins the partitions of two composing RDDs. Inside, the join consists on using CoGroupedIterator to return the mixed (initial state + input row) or separate rows (initial state OR input row). That's the reason why you will see your stateful function called even for the rows without the input data. The iteration also contains a guard against duplicate keys in the default
case class FlatMapGroupsWithStateExec // ... new CoGroupedIterator( groupedChildDataIter, groupedInitialStateIter, groupingAttributes).flatMap { case (keyRow, valueRowIter, initialStateRowIter) => val keyUnsafeRow = keyRow.asInstanceOf[UnsafeRow] var foundInitialStateForKey = false initialStateRowIter.foreach { initialStateRow => if (foundInitialStateForKey) { FlatMapGroupsWithStateExec.foundDuplicateInitialKeyException() } foundInitialStateForKey = true val initStateObj = getStateObj.get(initialStateRow) stateManager.putState(store, keyUnsafeRow, initStateObj, NO_TIMESTAMP) } // We apply the values for the key after applying the initial state. callFunctionAndUpdateState( stateManager.getState(store, keyUnsafeRow), valueRowIter, hasTimedOut = false ) } def foundDuplicateInitialKeyException(): Exception = { throw new IllegalArgumentException("The initial state provided contained " + "multiple rows(state) with the same key. Make sure to de-duplicate the " + "initial state before passing it.") }
As you can notice, there is a check to protect against duplicate keys in the initial state dataset. Below you can find an example of initializing the state:
val defaultState = Seq( ("user1", 10), ("user2", 20), ("user3", 30) ).toDF("login", "points").as[(String, Int)].groupByKey(row => row._1).mapValues(_._2) val inputStream = new MemoryStream[(String, Int)](1, sparkSession.sqlContext) inputStream.addData(("user1", 5)) inputStream.addData(("user4", 2)) val statefulAggregation = inputStream.toDS().toDF("login", "points") .groupByKey(row => row.getAs[String]("login")) .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout(), defaultState)(StatefulMapper.apply)
Misc
Among other changes you will find in Structured Streaming:
- an optimized Write-Ahead-Log commit phase - the optimization is based on using the memory for 2 actions. The first one stores the last purged micro-batch id to avoid a heavy list operation to know what can be purged. The second action stores the offset information of the previous micro-batch in memory instead of returning it from the checkpointed files.
- a new metric added by Venki Korukanti to track the number of dropped duplicates in dropDuplicates method. The new metric is called numDroppedDuplicateRows.
- spark.sql.streaming.fileStreamSink.ignoreMetadata - this new configuration property added by Liang-Chi Hsieh skips using
- support for interval ${delay} threshold expression of the watermark added by Kousuke Saruta.
- fixed an issue with min partitions distribution for Apache Kafka - Liang-Chi Hsieh also fixed the min partitions condition, previously expressed as <, it led to unnecessary partition split
- changed logical planning for streaming writes - Yuanjian Li reworked a bit how the streaming writes work. The rework added new logical nodes called WriteToStreamStatement and WriteToStream. The former is transformed to the latter with a logical rule called ResolveWriteToStream. The rule contains all code required to create and manage the recovery from the checkpoint location. Previously all these rules were included in the StreamingQueryManager class, making it inflexible for the extensions.
With this blog post I end the exploration of Apache Spark Structured Streaming 3.2.0 features. Next week you'll read other exciting features in the most recent version of the framework!