What's new in Apache Spark 3.1 - Project Zen

Versions: Apache Spark 3.1.1

I mentioned it very shortly in the first blog post ever about PySpark. Thanks to the Project Zen initiative, the Python part of Apache Spark will become more Pythonic and user friendly. How? Let's check that in this blog post!

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in πŸ‘‰ Early Release on the O'Reilly platform

I also help solve your data engineering problems πŸ‘‰ contact@waitingforcode.com πŸ“©

Documentation

First and foremost, great news for the newcomers! You will not be lost in the documentation anymore. If you remember, the previous documentation looked a bit like Javadoc. Sure, you found a lot of information about the API, often accompanied by the code snippets. However, it was quite hard to get a clear picture of the framework. The new version looks more like the main Apache Spark documentation, where you will find the explanation of various concepts and a "getting started" guide. All this in addition to the already present API page, placed in the main menu under "API Reference":

Documentation is great to get started, but what if you already know PySpark? You'll certainly be happy with the hints feature!

Hints

Apache Spark 3.1 supports hints, so types suggestions in Python. But they're not something new. You could use them before the 3.1 but as a side project called pyspark-stubs. Last year, Apache Spark contributors talked about integrating it into the main Apache Spark branch, and a few months later, Maciej Szymkiewicz, the author of pyspark-stubs, merged it to the main project repository!

What does it change? Thanks to type hints, you will have a better developer experience. Your IDE will provide more accurate autocompletion suggestions and detect type errors. In the following video, you can see the difference between PySpark 3.0 and PySpark 3.1:

How does it work? The new type hints use stub files. If you analyze the code source of the new PySpark, you will find them in the files terminating with a .pyi extension. Inside these files, you will find exactly the same code as in the corresponding .py files, except that the functions won't be implemented. They will only contain some additional type information. For example, if you take context.py file defining the API of SparkContext, you will find something like (comments omitted for clarity):

class SparkContext(object):
# ...
    def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
                 environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
                 gateway=None, jsc=None, profiler_cls=BasicProfiler):

As you can see, guessing the type for appName or pyFiles is not obvious. However, if your development tool can access the corresponding context.pyi file, it will get the correct type and hence, provide a better autocomplete experience to you:

class SparkContext:
    master: str
    appName: str
    sparkHome: str
    PACKAGE_EXTENSIONS: Iterable[str]
    def __init__(
        self,
        master: Optional[str] = ...,
        appName: Optional[str] = ...,
        sparkHome: Optional[str] = ...,
        pyFiles: Optional[List[str]] = ...,
        environment: Optional[Dict[str, str]] = ...,
        batchSize: int = ...,
        serializer: Serializer = ...,
        conf: Optional[SparkConf] = ...,
        gateway: Optional[JavaGateway] = ...,
        jsc: Optional[JavaObject] = ...,
        profiler_cls: type = ...,
    ) -> None: ...

The hints are great for the development process but as you saw in the video, won't prevent runtime errors caused by incorrect types. Even though PySpark also got an improvement for the exception handling.

Exception messages for UDFs

If you are using at least Python 3.7, you can enable the spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled property (disabled by default), to keep only the essential error information in the traceback. What does it mean concretely? Let's take an example shared by Hyukjin Kwon who is the author of the change:

spark_session = SparkSession.builder.master("local[*]").getOrCreate()

spark_session.range(10).select(udf(lambda x: x/0)("id")).collect()

If you execute it without simplifiedTraceback or in PySpark 3.0, in the error message you should see something like:

21/03/22 06:36:57 ERROR TaskSetManager: Task 1 in stage 0.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "/home/bartosz/workspace/spark-playground/spark-3.1.1-vs-3.0.1/pyspark-3.0.1/exception_message.py", line 8, in 
    spark_session.range(10).select(udf(lambda x: x/0)("id")).collect()
  File "/home/bartosz/workspace/spark-playground/spark-3.1.1-vs-3.0.1/pyspark-3.0.1/.venv2/lib/python3.6/site-packages/pyspark/sql/dataframe.py", line 596, in collect
    sock_info = self._jdf.collectToPython()
  File "/home/bartosz/workspace/spark-playground/spark-3.1.1-vs-3.0.1/pyspark-3.0.1/.venv2/lib/python3.6/site-packages/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/bartosz/workspace/spark-playground/spark-3.1.1-vs-3.0.1/pyspark-3.0.1/.venv2/lib/python3.6/site-packages/pyspark/sql/utils.py", line 134, in deco
    raise_from(converted)
  File "", line 3, in raise_from
pyspark.sql.utils.PythonException:
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/bartosz/workspace/spark-playground/spark-3.1.1-vs-3.0.1/pyspark-3.0.1/.venv2/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/home/bartosz/workspace/spark-playground/spark-3.1.1-vs-3.0.1/pyspark-3.0.1/.venv2/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/bartosz/workspace/spark-playground/spark-3.1.1-vs-3.0.1/pyspark-3.0.1/.venv2/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/home/bartosz/workspace/spark-playground/spark-3.1.1-vs-3.0.1/pyspark-3.0.1/.venv2/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/home/bartosz/workspace/spark-playground/spark-3.1.1-vs-3.0.1/pyspark-3.0.1/.venv2/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched
    for item in iterator:
  File "/home/bartosz/workspace/spark-playground/spark-3.1.1-vs-3.0.1/pyspark-3.0.1/.venv2/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/home/bartosz/workspace/spark-playground/spark-3.1.1-vs-3.0.1/pyspark-3.0.1/.venv2/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in 
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/home/bartosz/workspace/spark-playground/spark-3.1.1-vs-3.0.1/pyspark-3.0.1/.venv2/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in 
    return lambda *a: f(*a)
  File "/home/bartosz/workspace/spark-playground/spark-3.1.1-vs-3.0.1/pyspark-3.0.1/.venv2/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "/home/bartosz/workspace/spark-playground/spark-3.1.1-vs-3.0.1/pyspark-3.0.1/exception_message.py", line 8, in 
    spark_session.range(10).select(udf(lambda x: x/0)("id")).collect()
ZeroDivisionError: division by zero

Thanks to the simplifiedTraceback property, it becomes:

21/03/22 06:55:18 ERROR TaskSetManager: Task 5 in stage 0.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "/home/bartosz/workspace/spark-playground/spark-3.1.1-vs-3.0.1/pyspark-3.1.1/exception_message.py", line 8, in 
    spark_session.range(10).select(udf(lambda x: x/0)("id")).collect()
  File "/home/bartosz/workspace/spark-playground/spark-3.1.1-vs-3.0.1/pyspark-3.1.1/.venv3.8/lib/python3.8/site-packages/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/home/bartosz/workspace/spark-playground/spark-3.1.1-vs-3.0.1/pyspark-3.1.1/.venv3.8/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/home/bartosz/workspace/spark-playground/spark-3.1.1-vs-3.0.1/pyspark-3.1.1/.venv3.8/lib/python3.8/site-packages/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.PythonException:
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/bartosz/workspace/spark-playground/spark-3.1.1-vs-3.0.1/pyspark-3.1.1/exception_message.py", line 8, in 
    spark_session.range(10).select(udf(lambda x: x/0)("id")).collect()
ZeroDivisionError: division by zero

Other changes

Those 3 changes come from the Project Zen epic. However, PySpark 3.1 has some other, not Project Zen-specific, improvements. The first of them fixes a synchronization between Python and JVM threads in the pinned thread mode . Before the fix, Python and JVM threads termination was not synchronized and when the Python thread finished, the JVM one was still running. It shouldn't happen anymore starting from PySpark 3.1.1.

Pinned threads in PySpark

Before adding the support for pinned thread model, PVM and JVM threads in PySpark were independent. It made the threads synchronization impossible which led to some problems related to JVM threading, like the one with job groups described in SPARK-22340.

To solve the problem of associating Java threads to Python threads, SPARK-22340 added a variable called PYSPARK_PIN_THREAD. When enabled, it synchronizes threads on Python and Java virtual machines.

Pinned thread is not PySpark-specific because it's provided with Py4j library. It's known there as a Single Threading Model.

In addition the pinned thread fix, you will find more user-facing changes like:

Finally, the support for --archives option is now native (previously supported only in YARN mode), and you shouldn't bet NPE when casting data from or to a PythonUserDefinedType.

As you can see, the community has done a lot in the PySpark field. The changes should help the newcomers (doc, types) but also current users thanks to the bug fixes and improved compatibility with Scala API.