This small post is the complement for previous article describing big lines of shuffle. It focuses more in details on writing part.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I wrote
one on that topic! You can read it online
on the O'Reilly platform,
or get a print copy on Amazon.
I also help solve your data engineering problems π contact@waitingforcode.com π©
Technically the specific task responsible for shuffle operation is ShuffleMapTask. Its run() method contains the code triggering data shuffling save on disk:
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
// ...
During my tests on shuffling, I was a little bit surprised by the number of generated files for RDD with more than 1 partition. For example a shuffle for 10 partitions was generating 20 files:
blockmgr-2bde683f-4e64-44f1-af88-ac6fc6e5fa28/
βββ 04
βΒ Β βββ shuffle_0_8_0.data
βββ 08
βΒ Β βββ shuffle_0_8_0.index
βββ 0a
βΒ Β βββ shuffle_0_6_0.index
βββ 0c
βΒ Β βββ shuffle_0_0_0.data
βΒ Β βββ shuffle_0_4_0.index
βββ 0d
βΒ Β βββ shuffle_0_3_0.index
βββ 0e
βββ 0f
βΒ Β βββ shuffle_0_1_0.index
βββ 11
βββ 13
βββ 15
βΒ Β βββ shuffle_0_1_0.data
βββ 1b
βΒ Β βββ shuffle_0_7_0.data
βββ 1d
βΒ Β βββ shuffle_0_9_0.data
βββ 27
βΒ Β βββ shuffle_0_5_0.data
βββ 29
βΒ Β βββ shuffle_0_3_0.data
βββ 30
βΒ Β βββ shuffle_0_0_0.index
βββ 32
βΒ Β βββ shuffle_0_2_0.index
βββ 35
βΒ Β βββ shuffle_0_5_0.index
βββ 36
βΒ Β βββ shuffle_0_2_0.data
βββ 37
βΒ Β βββ shuffle_0_7_0.index
βββ 38
βΒ Β βββ shuffle_0_4_0.data
βββ 39
βΒ Β βββ shuffle_0_9_0.index
βββ 3a
βββ shuffle_0_6_0.data
22 directories, 20 files
The reason of that was the fact that 1 ShuffleMapTask is generated for each initial partition. And so even in standalone mode with a single executor. But it's possible that after the first mapping transformation, these initial partitions will change. During shuffle, the data is placed into shuffle files according to these new partitions and not the initial ones.
Effective writing is made by SortShuffleWriter. First, it creates appropriate (with ordering and aggregation) ExternalSorter instance. After, it appends all partition records to it. ExternalSorter adds these records to its memory buffer and eventually splits them on disk when the buffer is full (= spilling).
Once all records processed, writer prepares 2 temporary files: one with data and the second with partitions lengths. The task of saving data goes to ExternalSorter's writePartitionedFile(BlockId, File) method. Inside it, ExternalSorter checks first if some spilling occurred. If yes, it merges separated files here:
// We must perform merge-sort; get an iterator by partition and write everything directly.
for ((id, elements) <- this.partitionedIterator) {
// ...
}
Otherwise ExternalSorter reads all partitions that it holds and writes their content to disk. The writing is made by blocks. Each block represents 1 partition's data. At the end of writing, ExternalSorter appends the length of block to an array used to construct final .index file. The same operation is made in the case of merging spilled files.
The result of writePartitionedFile(...) is an array containing the length of each partition. For example, if we have 3 partitions, each of length 50, the result will be [50, 50, 50]. The indices of the array correspond to the partitions numbers.
After, the final .index file is generated thanks to that code:
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
offset += length
out.writeLong(offset)
}
For our 50-length partitions we should receive a file looking like: "0,50,100,150", where 0 represents the beginning and 50 the end for the 1st partition, 50 the beginning and 100 the end for the 2nd partition, and finally 100 the beginning and 150 the end of the 3rd partition.
From the reducer's reading side it's less complicated. ShuffleBlockFetcherIterator is in charge of fetching records from files. It's also responsible for final sorting. If records need to be sorted, it will create, after reading all records of 1 partition, an instance of ExternalSorter and sort records with defined ordering.
This complementary note shows how shuffle writing and reading process occurs in Spark. It explains the main classes participating in this process and generated output files.
Consulting
With nearly 16 years of experience, including 8 as data engineer, I offer expert consulting to design and optimize scalable data solutions.
As an OβReilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and
drive data-driven decision-making. Let's transform your data challenges into opportunitiesβreach out to elevate your data engineering game today!
π contact@waitingforcode.com
π past projects

