As already told in one of previous posts about Spark, shuffle is a process which moves data between nodes. It's orchestrated by a specific manager and it will be the topic of this post.
New ebook 🔥
Learn 84 ways to solve common data engineering problems with cloud services.
This post presents some main points about shuffle. The first part describes configuration of the process. The second section presents sort-based shuffle manager, used by the default one by Spark. The last part digs deep into code and shows how sort-based shuffle manager works internally.
Shuffle general information
As already told, the goal of shuffle process consists on bringing all data with the same key to the same executor. Thanks to that each executor can correctly compute the result of key-based transformations, such as reduceByKey, groupByKey or join. The presence of shuffle operation can be detected through the output of toDebugString() method. If it contains a ShuffledRDD, it means that shuffle occurred somewhere in the processing.
To illustrate shuffle process, let's imagine some job working on pair RDD. First, we want to make transformations, for example change value to other type (map). After, we want to make some operations on RDD's key, for example: reducing records by key. Between these 2 operations shuffle process has place. It consists on moving prepared data (with value changed to other type) to a common place for each key. For a better distinction, part preparing data will be called mapper and the part getting it reducer.
However, shuffle is not a process out-of-control. It's handled by the implementation of ShuffleManager. The choice of the implementation can be defined as a configuration property spark.shuffle.manager. It can take a value of "sort" or "tungsten-sort" - internally both represented by the same class. It can also represent a full class name, if we decide to use custom implementation.
Additionally, we can specify if shuffled data will be compressed (spark.shuffle.compress entry). Its value is true by default and used compression codec is the same as defined in spark.io.compression.codec. Compression helps to reduce the amount of data sent across the network. But it adds inevitably a supplementary step of decompression at reading stage.
Sort shuffle manager
The release of Spark 2.0 brought some changes with shuffle management. It totally removed hash shuffle manager and left only sort-based shuffle manager. It's the reason why we focus here only on the second one. But for the small history and better understanding of sort-based shuffle manager, you'd know that one of its biggest drawbacks was the number of created intermediary files. Each mapper was creating 1 file for each reducer. For example, for 5 mappers and 5 reducers, the hash-based manager was operating on 25 files (and 5*5 it's really an non representative example for big data environments !). One of the drawbacks of that was related to random writes. Unlike sequential ones, random writes are composed not only by writes but also by seeks. And naturally, they are slower because of this seeking step. Since it's done on a lot of files, the performances decrease seriously.
Sort-based manager changes it. Mapper puts all partition records to a single file. To understand how does shuffle work, let's list clearly its important steps:
- At the beginning, mapper accumulates all records in memory within PartitionedAppendOnlyMap. The records are grouped together by partition. When there are no more space in the memory, records are saved to the disk. In Spark's nomenclature this action is often called spilling. To check if spilling occurred, you can search for following entries in logs:
INFO ExternalSorter: Task 1 force spilling in-memory map to disk it will release 352.2 MB memory
- Once all records are treated, Spark saves them on disk. It generates 2 files: .data holding records and .index. Data file contains records ordered by their partition. The index file contains the beginning and the end of each stored partition in data file. It defines where given partition starts and ends.
- During reading, reducers use index file to see where records they need are located. Once they know that, they fetch the data and iterate over it to construct expected output.
If files weren't merged during mapping phase, they're merged before iterating in the reading step.
However, there is a special behavior when the number of partitions is less than specified in spark.shuffle.sort.bypassMergeThreshold configuration entry, no ordering and no aggregation are specified. In this situation, sort-based manager outputs records to separate files, one file for each reduce partition. Only at the end, these files are concatenated to 1 common file.
Shuffle manager under-the-hood
Under-the-hood, shuffle manager is created at the same time as org.apache.spark.SparkEnv. It can be initialized with Spark-based tungsten-sort, or sort managers. The 'sort' value is the default one. If one of them is specified, used shuffle manager will be the instance of SortShuffleManager. By looking on the source of this class, we can find the materialization of previously described actions.
For the reducer part, we can see a method called getReader(...). It returns an instance of ShuffleReader used to read shuffled data. Internally, it uses read(...) method to fetch records through an iterator. We can meet there an instance of SerializerInstance. This object converts back shuffled records to real Java objects.
From mapper's perspective, data is serialized and sent by a ShuffleWriter. Used implementation of this interface is resolved through ShuffleHandle, representing the type of shuffle. It represents, among others, the situation quoted prevously when the number of partitions was lower or equal than threshold specified in spark.shuffle.sort.bypassMergeThreshold. In this case, used implementation is represented by BypassMergeSortShuffleHandle. Apart of that, 2 others handlers are: SerializedShuffleHandle (output in a serialized form) and BaseShuffleHandle (output not serialized).
Once appropriated writer resolved, 2 temporary files destined to reducer, are created: one storing the data, the second storing the index file with the offsets of each block. Sample content of compressed data file can look like below:
LZ4Block%\8D\00\00\00\A5\00\00\00\E0\87\D6\F6!\AC\ED\00sr\00java.lang.Integer⠤\F7\81\878\00I\00valuexr\00(\00\F4Number\86\AC\95\94\E0\8B\00\00xp\00\00\00t\00SMALL (101); \00\8011)sq\00~\00\00Dt\00\003%\00wBIG (15#\00\D03t\00 BIG (171)LZ4Block\00\00\00\00\00\00\00\00\00\00\00\00
And below, sample directories structure after mapping phase:
# when 1 partition is used . ├── 0c │ └── shuffle_0_0_0.data ├── 0d ├── 0e ├── 11 ├── 13 └── 30 └── shuffle_0_0_0.index # when 2 partitions are used . ├── 0c │ └── shuffle_0_0_0.data ├── 0d ├── 0e ├── 0f │ └── shuffle_0_1_0.index ├── 11 ├── 13 ├── 15 │ └── shuffle_0_1_0.data └── 30 └── shuffle_0_0_0.index
The numbers after shuffle_ represent, consecutively, shuffle id, map id and reduce id.
As we can see in the first part, shuffle is not an autonomous process, generated only by Spark. We can configure it through several properties. By default, a sort-based manager is used. It's presented in the second part. The last part shown some internals of sort-based shuffle manager, as well investigating on Scala objects, as on generated output files.