What's new in Apache Spark 3.3.0 - PySpark

Versions: Apache Spark 3.3.0 https://github.com/bartosz25/spark-playground/tree/master/spark-3.3.0-features/pyspark

It's time for the last "What's new in Apache Spark 3.3.0..." before a break. Today we'll see what changed in PySpark. Spoiler alert: Pandas users should find one feature very exciting!

Improved type hints

Type hints are the first feature. No, they're not new because PySpark has supported them since the 3.1.1 release as stub files. In the most recent version, DCH Nguyen, Byron Hsu and Maciej Szymkiewicz, inlined the types, so included them directly in Python files! Take a look at the new dataframe.py:

classDataFrame(PandasMapOpsMixin, PandasConversionMixin):
# ...
defcolRegex(self, colName: str) ->Column:
# ...
defshow(self, n: int=20, truncate: Union[bool, int] =True, vertical: bool=False) ->None:
# ...

PyArrow batch interface

For the most user-facing features, Hyukjin Kwon added a new mapping function called mapInArrow to process Apache Arrow's RecordBatch:

  classPandasMapOpsMixin:
 def mapInArrow(self, func: "ArrowMapIterFunction", schema: Union[StructType, str]) ->"DataFrame":
 from pyspark.sql import DataFrame    
 from pyspark.sql.pandas.functions import pandas_udf    
 
 assert isinstance(self, DataFrame)    

 # The usage of the pandas_udf is internal so type checking is disabled.    
 
 udf = pandas_udf(    
 func, returnType=schema, functionType=PythonEvalType.SQL_MAP_ARROW_ITER_UDF
 ) # type: ignore[call-overload]    
 udf_column = udf(*[self[col] for col in self.columns])    
 jdf = self._jdf.pythonMapInArrow(udf_column._jc.expr())    
returnDataFrame(jdf, self.sparkSession)

Apache Arrow was something you might be unfamiliar with (I was). In a quick summary, Apache Arrow is an in-memory columnar format with a rich data type ecosystem. In PySpark, it facilitates the conversion between PySpark DataFrame and Pandas DataFrame. An example are Pandas UDFs (or Vectorized UDFs if you prefer), where Spark uses Arrow to transfer the data and Pandas to process the data, including the possibility to apply the vectorized operations.

Before, if users wanted to use Arrow format with other libraries within a single PySpark application, they had to convert the Pandas DataFrame to the Arrow batch in the UDF. The new mapInArrow method overcomes the issue because it exposes a RecordBatch class from Apache Arrow.

Pandas index improvements

In the 3.2.0 PySpark improved for Pandas default index. The sequentially increasing range was moved to Scala (SPARK-36338) to avoid unnecessary serialization and enhance the nullability constraints. Additionally, the implementation now lives in Catalyst to leverage built-in optimizations such as column pruning (SPARK-36559).

All this to say, the feature is not new, but now Hyukjin Kwon has turned it into the default sequence mode for the Pandas API on top of Apache Spark.

And there were other index-related changes in PySpark 3.3.0! Hyukjin Kwon added support for defining the index type in the API and printing the default index type in the Pandas plans explain.

Misc

Let's do the final review on the features that I couldn't categorize into the dedicated sections:

If you need some examples, you will find them below:

from pyspark import Row
from pyspark.sql import SparkSession

spark_session = SparkSession.builder.master("local[*]") \
    .appName("New features") \
    .getOrCreate()

# Catalog functions
assert not spark_session.catalog.functionExists("not_existing_fn", "default")
spark_session.sql("CREATE FUNCTION fn AS 'com.waitingforcode.fn'")
assert spark_session.catalog.functionExists("fn", "default")

# Schema inference
dataframe_as_dict = [{"nested": {"city": "Paris", "country": "France"}}]
cities_from_dict = spark_session.createDataFrame(dataframe_as_dict)
cities_from_dict.show(truncate=False)
"""
+----------------------------------+
|nested                            |
+----------------------------------+
|{country -> France, city -> Paris}|
+----------------------------------+
"""

# Python's standard string formatter
User = Row("name")

users_df = spark_session.createDataFrame([
    User("a"), User("b"), User("c"), User("d")
])

spark_session.sql("SELECT * FROM {table_from_dataframe}", table_from_dataframe=users_df).show(truncate=False)

PySpark 3.3.0 should be, as its predecessors, even more user-friendly and easier to use by Pandas users. And the effort of improving it is ongoing! Hyukjin Kwon and Xinrong Meng announced during the last Data+AI Summit some exciting features planned in the framework, such as native NumPy support, and coverage for the most recent Pandas API changes.