What's new in Apache Spark 3.4.0 - Structured Streaming

Versions: Apache Spark 3.4.0

The asynchronous progress tracking and correctness issue fixes presented in the previous blog posts are not the single new feature in Apache Spark Structured Streaming 3.4.0. There are many others but to keep the blog post readable, I'll focus here only on 3 of them.

A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free

👉 Register here

Arbitrary Stateful Processing in PySpark

The first feature is an important addition to PySpark Structured Streaming API. Jungtaek Lim implemented an arbitrary stateful processing function! Starting from now, if you need to implement an arbitrary stateful code, you can use the applyInPandasWithState.

Although the feature looks simple, it's only a high-level view. The implementation was driven by several interesting design decisions:

Below is an example of the arbitrary stateful processing that you can also find in my Github repo:

def count_labeled_numbers(key: Any, input_rows: Iterator[pandas.DataFrame], state: GroupState) -> Iterator[pandas.DataFrame]:
	session_state = "active"
	if state.hasTimedOut:
    	  count = state.get
    	  session_state = "timed_out"
    	  state.remove()
	else:
    	  count = 0
    	  for inp in input_rows:
        	    count += len(inp.index)
    	  if state.exists:
        	    old_count = state.get
        	    count += old_count[0]
    	  print(f'Updating state {state}')
    	  state.update((count,))
    	  state.setTimeoutDuration(1000)
	yield pandas.DataFrame({
    	"label": key,
    	"count": count,
    	"state": session_state
	})


even_odd_counts_from_stateful = input_data_enriched.withWatermark("timestamp", "5 seconds") \
	.groupBy("even_odd_label") \
	.applyInPandasWithState(
	func=count_labeled_numbers,
	outputStructType=StructType([
    	StructField("label", StringType()),
    	StructField("count", IntegerType()),
    	StructField("state", StringType())
	]),
	stateStructType=StructType([StructField("count", IntegerType())]),
	outputMode="update",
	timeoutConf="ProcessingTimeTimeout"
)

Protobuf support

Apache Avro has been a first-class citizen in Apache Spark for a long time already. But it's not the single popular serialization format. Another one is Protobuf and the support for it was added by Raghu Angadi and Sandish Kumar HN in the recent release.

Among the supported features you'll find the conversion functions (from|to_protobuf) and support for file descriptors and generated classes. During the implementation there was a question of adding the support for Schema Registry and schemas evolution. Since both are not supported for Avro, they will eventually be implemented in further releases for both formats.

Meantime, you can find a code snippet using Protobuf from a descriptor file:

val descriptorPath = s"${mainPath}/protobuf.pb"
// To generate the descriptor file, you must install the `protoc` library
// and call:
// protoc --descriptor_set_out=/tmp/spark/3.4.0/protobuf/protobuf.pb --proto_path=/tmp/spark/3.4.0/protobuf protobuf.schema

import sparkSession.implicits._
val datasetWithProtobuf = Seq(
 (UUID.randomUUID().toString, """{some event data}"""),
 (UUID.randomUUID().toString, """{other event data}""")
).toDF("id", "content")
  .select(to_protobuf(
    data = struct($"id", $"content"),
    messageName = "TestEvent",
    descFilePath = descriptorPath).as("value"))
datasetWithProtobuf.show(false)

datasetWithProtobuf.select(
  from_protobuf(
    data = $"value",
    messageName = "TestEvent",
    descFilePath = descriptorPath
  )
).show(false)

Metrics from sinks

In the last focus Boyang Jerry Peng added a support for custom metrics in sinks. Any sink that need to expose some metrics must implement the ReportsSinkMetrics interface.

To read the metrics you can access the sink.metrics property like in the following snippet:

new StreamingQueryListener {

  	override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}

  	override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    	println("Got metrics for the sink ${event.progress.sink.metrics}")
  	}

  	override def onQueryTerminated(
    	event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
	}

I'm expecting the Project Lightspeed to be a huge improvement for Apache Spark Structured Streaming. Although it's not fully implemented yet, we can already see its first components in Apache Spark with the arbitrary stateful processing for Python users and asynchronous log tracking (presented in the previous blog post). Alongside the continuously improving core parts (watermark, metrics), the streaming part of Apache Spark will only get better!