Serializers in PySpark

We've learned in the previous PySpark blog posts about the serialization overhead between the Python application and JVM. An intrinsic actor of this overhead are Python serializers that will be the topic of this article and hopefully, will provide a more complete overview of the Python <=> JVM serialization.

Serializer API

A typical PySpark serializer supports 2 operations, the dump and load. The former one serializes Python objects to the output stream while the latter does the opposite and returns the deserialized objects from the input stream.

PySpark implements the serializers with the Serializer class that defines the aforementioned dump and load method:

class Serializer:
    def dump_stream(self, iterator, stream):
        """
        Serialize an iterator of objects to the output stream.
        """
        raise NotImplementedError

    def load_stream(self, stream):
        """
        Return an iterator of deserialized objects from the input stream.
        """
        raise NotImplementedError

    def dumps(self, obj):
        """
        Serialize an object into a byte array.
        When batching is used, this will be called with an array of objects.
        """
        raise NotImplementedError

    def _load_stream_without_unbatching(self, stream):
        """
        Return an iterator of deserialized batches (iterable) of objects from the input stream.
        If the serializer does not operate on batches the default implementation returns an
        iterator of single element lists.
        """
        return map(lambda x: [x], self.load_stream(stream))

Types and purposes

The Serializer is an abstract class with multiple serializers. By "multiple" I mean really a lot and that's the reason why I prefer to give you a general object tree hierarchy before focusing more deeply on the serializers in the code.

As you can notice, there are 3 main groups of serializers that I'll shortly detail in the next sections.

Deserializers

The easiest one to understand because it contains 3 classes (CartesianDeserializer, PairDeserializer, UTF8Deserializer) responsible for deserializing objects from the input stream. To be more precise:

PySpark serializers - BatchedSerialier

It's the most popular group where you can find 2 main serializers. The first of them is BatchedSerializer that works on the batched streams. It's a wrapper around another serializer and its single added value is grouping the iterator into batches, as shows the _batched function called in the load and dump actions:

class BatchedSerializer(Serializer):
# ...
    def _batched(self, iterator):
        if self.batchSize == self.UNLIMITED_BATCH_SIZE:
            yield list(iterator)
        elif hasattr(iterator, "__len__") and hasattr(iterator, "__getslice__"):
            n = len(iterator)
            for i in range(0, n, self.batchSize):
                yield iterator[i : i + self.batchSize]
        else:
            items = []
            count = 0
            for item in iterator:
                items.append(item)
                count += 1
                if count == self.batchSize:
                    yield items
                    items = []
                    count = 0
            if items:
                yield items

The BatchedSerializer has 2 implementations:

PySpark serializers - FramedSerializer

The second PySpark serializers family is represented by FramedSerializer. As its name suggests, it works on top of frames and writes objects as (length, data) pairs. Among its implementations you'll find:

Both Pickle-based serializers are the most widely used in the data exchange with the default choice set to CloudPickleSerializer if the Python version is greater than 3.8:

# pyspark/serializers.py
if sys.version_info < (3, 8):
   CPickleSerializer = PickleSerializer
else:
   CPickleSerializer = CloudPickleSerializer

The Pickle-like serializers are default serializers for transferring data but if for whatever reason you need to change it, you can do it with the serializer attribute of SparkContext. It's worth noticing that the SparkContext must be created before SparkSession in that context:

spark_context = SparkContext(serializer=MarshalSerializer())
spark_session = SparkSession.builder.master("local[1]").getOrCreate()

Pandas serializers

Finally, PySpark also has serializers from the Pandas package: