The asynchronous progress tracking and correctness issue fixes presented in the previous blog posts are not the single new feature in Apache Spark Structured Streaming 3.4.0. There are many others but to keep the blog post readable, I'll focus here only on 3 of them.
Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote
one on that topic! You can read it online
on the O'Reilly platform,
or get a print copy on Amazon.
I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩
Arbitrary Stateful Processing in PySpark
The first feature is an important addition to PySpark Structured Streaming API. Jungtaek Lim implemented an arbitrary stateful processing function! Starting from now, if you need to implement an arbitrary stateful code, you can use the applyInPandasWithState.
Although the feature looks simple, it's only a high-level view. The implementation was driven by several interesting design decisions:
- Why Pandas instead of arbitrary Python objects? First, for the user experience. PySpark users already have a possibility to define Pandas-based transformations with the functions like applyInPandas. Second, using the Pandas abstraction provides several optimizations, such as vectorization or batching.
- Duck typing. The function must have the state and output schema provided to mitigate the lack of strong typing in Python.
- Iterator of Pandas DataFrame as the input and as the output.. The user-defined arbitrary stateful processing function consumes and returns an iterator of Pandas DataFrame. This decision addresses scalability issue related to fitting the whole DataFrame in memory. For the stateful part, Jungtaek used the generators to lazily generate the DataFrames and optimize the memory usage. As long as the user doesn't materialize many DataFrames at the same time, the memory shouldn't suffer that much.
- Bin-packing. To optimize the exchanged data between the JVM and PVM, the feature relies on Apache Arrow batching. However, the batches for individual state groups may be small which would result in a lot of serialization overhead. To mitigate that risk, the implementation collects different state groups in the same batch.
Below is an example of the arbitrary stateful processing that you can also find in my Github repo:
def count_labeled_numbers(key: Any, input_rows: Iterator[pandas.DataFrame], state: GroupState) -> Iterator[pandas.DataFrame]: session_state = "active" if state.hasTimedOut: count = state.get session_state = "timed_out" state.remove() else: count = 0 for inp in input_rows: count += len(inp.index) if state.exists: old_count = state.get count += old_count[0] print(f'Updating state {state}') state.update((count,)) state.setTimeoutDuration(1000) yield pandas.DataFrame({ "label": key, "count": count, "state": session_state }) even_odd_counts_from_stateful = input_data_enriched.withWatermark("timestamp", "5 seconds") \ .groupBy("even_odd_label") \ .applyInPandasWithState( func=count_labeled_numbers, outputStructType=StructType([ StructField("label", StringType()), StructField("count", IntegerType()), StructField("state", StringType()) ]), stateStructType=StructType([StructField("count", IntegerType())]), outputMode="update", timeoutConf="ProcessingTimeTimeout" )
Protobuf support
Apache Avro has been a first-class citizen in Apache Spark for a long time already. But it's not the single popular serialization format. Another one is Protobuf and the support for it was added by Raghu Angadi and Sandish Kumar HN in the recent release.
Among the supported features you'll find the conversion functions (from|to_protobuf) and support for file descriptors and generated classes. During the implementation there was a question of adding the support for Schema Registry and schemas evolution. Since both are not supported for Avro, they will eventually be implemented in further releases for both formats.
Meantime, you can find a code snippet using Protobuf from a descriptor file:
val descriptorPath = s"${mainPath}/protobuf.pb" // To generate the descriptor file, you must install the `protoc` library // and call: // protoc --descriptor_set_out=/tmp/spark/3.4.0/protobuf/protobuf.pb --proto_path=/tmp/spark/3.4.0/protobuf protobuf.schema import sparkSession.implicits._ val datasetWithProtobuf = Seq( (UUID.randomUUID().toString, """{some event data}"""), (UUID.randomUUID().toString, """{other event data}""") ).toDF("id", "content") .select(to_protobuf( data = struct($"id", $"content"), messageName = "TestEvent", descFilePath = descriptorPath).as("value")) datasetWithProtobuf.show(false) datasetWithProtobuf.select( from_protobuf( data = $"value", messageName = "TestEvent", descFilePath = descriptorPath ) ).show(false)
Metrics from sinks
In the last focus Boyang Jerry Peng added a support for custom metrics in sinks. Any sink that need to expose some metrics must implement the ReportsSinkMetrics interface.
To read the metrics you can access the sink.metrics property like in the following snippet:
new StreamingQueryListener { override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {} override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { println("Got metrics for the sink ${event.progress.sink.metrics}") } override def onQueryTerminated( event: StreamingQueryListener.QueryTerminatedEvent): Unit = {} }
I'm expecting the Project Lightspeed to be a huge improvement for Apache Spark Structured Streaming. Although it's not fully implemented yet, we can already see its first components in Apache Spark with the arbitrary stateful processing for Python users and asynchronous log tracking (presented in the previous blog post). Alongside the continuously improving core parts (watermark, metrics), the streaming part of Apache Spark will only get better!
Consulting

With nearly 16 years of experience, including 8 as data engineer, I offer expert consulting to design and optimize scalable data solutions.
As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and
drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!
👉 contact@waitingforcode.com
đź”— past projects