The asynchronous progress tracking and correctness issue fixes presented in the previous blog posts are not the single new feature in Apache Spark Structured Streaming 3.4.0. There are many others but to keep the blog post readable, I'll focus here only on 3 of them.
A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free
👉 Register here
Arbitrary Stateful Processing in PySpark
The first feature is an important addition to PySpark Structured Streaming API. Jungtaek Lim implemented an arbitrary stateful processing function! Starting from now, if you need to implement an arbitrary stateful code, you can use the applyInPandasWithState.
Although the feature looks simple, it's only a high-level view. The implementation was driven by several interesting design decisions:
- Why Pandas instead of arbitrary Python objects? First, for the user experience. PySpark users already have a possibility to define Pandas-based transformations with the functions like applyInPandas. Second, using the Pandas abstraction provides several optimizations, such as vectorization or batching.
- Duck typing. The function must have the state and output schema provided to mitigate the lack of strong typing in Python.
- Iterator of Pandas DataFrame as the input and as the output.. The user-defined arbitrary stateful processing function consumes and returns an iterator of Pandas DataFrame. This decision addresses scalability issue related to fitting the whole DataFrame in memory. For the stateful part, Jungtaek used the generators to lazily generate the DataFrames and optimize the memory usage. As long as the user doesn't materialize many DataFrames at the same time, the memory shouldn't suffer that much.
- Bin-packing. To optimize the exchanged data between the JVM and PVM, the feature relies on Apache Arrow batching. However, the batches for individual state groups may be small which would result in a lot of serialization overhead. To mitigate that risk, the implementation collects different state groups in the same batch.
Below is an example of the arbitrary stateful processing that you can also find in my Github repo:
def count_labeled_numbers(key: Any, input_rows: Iterator[pandas.DataFrame], state: GroupState) -> Iterator[pandas.DataFrame]: session_state = "active" if state.hasTimedOut: count = state.get session_state = "timed_out" state.remove() else: count = 0 for inp in input_rows: count += len(inp.index) if state.exists: old_count = state.get count += old_count[0] print(f'Updating state {state}') state.update((count,)) state.setTimeoutDuration(1000) yield pandas.DataFrame({ "label": key, "count": count, "state": session_state }) even_odd_counts_from_stateful = input_data_enriched.withWatermark("timestamp", "5 seconds") \ .groupBy("even_odd_label") \ .applyInPandasWithState( func=count_labeled_numbers, outputStructType=StructType([ StructField("label", StringType()), StructField("count", IntegerType()), StructField("state", StringType()) ]), stateStructType=StructType([StructField("count", IntegerType())]), outputMode="update", timeoutConf="ProcessingTimeTimeout" )
Protobuf support
Apache Avro has been a first-class citizen in Apache Spark for a long time already. But it's not the single popular serialization format. Another one is Protobuf and the support for it was added by Raghu Angadi and Sandish Kumar HN in the recent release.
Among the supported features you'll find the conversion functions (from|to_protobuf) and support for file descriptors and generated classes. During the implementation there was a question of adding the support for Schema Registry and schemas evolution. Since both are not supported for Avro, they will eventually be implemented in further releases for both formats.
Meantime, you can find a code snippet using Protobuf from a descriptor file:
val descriptorPath = s"${mainPath}/protobuf.pb" // To generate the descriptor file, you must install the `protoc` library // and call: // protoc --descriptor_set_out=/tmp/spark/3.4.0/protobuf/protobuf.pb --proto_path=/tmp/spark/3.4.0/protobuf protobuf.schema import sparkSession.implicits._ val datasetWithProtobuf = Seq( (UUID.randomUUID().toString, """{some event data}"""), (UUID.randomUUID().toString, """{other event data}""") ).toDF("id", "content") .select(to_protobuf( data = struct($"id", $"content"), messageName = "TestEvent", descFilePath = descriptorPath).as("value")) datasetWithProtobuf.show(false) datasetWithProtobuf.select( from_protobuf( data = $"value", messageName = "TestEvent", descFilePath = descriptorPath ) ).show(false)
Metrics from sinks
In the last focus Boyang Jerry Peng added a support for custom metrics in sinks. Any sink that need to expose some metrics must implement the ReportsSinkMetrics interface.
To read the metrics you can access the sink.metrics property like in the following snippet:
new StreamingQueryListener { override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {} override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { println("Got metrics for the sink ${event.progress.sink.metrics}") } override def onQueryTerminated( event: StreamingQueryListener.QueryTerminatedEvent): Unit = {} }
I'm expecting the Project Lightspeed to be a huge improvement for Apache Spark Structured Streaming. Although it's not fully implemented yet, we can already see its first components in Apache Spark with the arbitrary stateful processing for Python users and asynchronous log tracking (presented in the previous blog post). Alongside the continuously improving core parts (watermark, metrics), the streaming part of Apache Spark will only get better!