Spark shuffle - complementary notes

Versions: Spark 2.0.0

This small post is the complement for previous article describing big lines of shuffle. It focuses more in details on writing part.

Technically the specific task responsible for shuffle operation is ShuffleMapTask. Its run() method contains the code triggering data shuffling save on disk:

var writer: ShuffleWriter[Any, Any] = null
try {
  val manager = SparkEnv.get.shuffleManager
  writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
  writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
  writer.stop(success = true).get
// ...

During my tests on shuffling, I was a little bit surprised by the number of generated files for RDD with more than 1 partition. For example a shuffle for 10 partitions was generating 20 files:

blockmgr-2bde683f-4e64-44f1-af88-ac6fc6e5fa28/
β”œβ”€β”€ 04
β”‚Β Β  └── shuffle_0_8_0.data
β”œβ”€β”€ 08
β”‚Β Β  └── shuffle_0_8_0.index
β”œβ”€β”€ 0a
β”‚Β Β  └── shuffle_0_6_0.index
β”œβ”€β”€ 0c
β”‚Β Β  β”œβ”€β”€ shuffle_0_0_0.data
β”‚Β Β  └── shuffle_0_4_0.index
β”œβ”€β”€ 0d
β”‚Β Β  └── shuffle_0_3_0.index
β”œβ”€β”€ 0e
β”œβ”€β”€ 0f
β”‚Β Β  └── shuffle_0_1_0.index
β”œβ”€β”€ 11
β”œβ”€β”€ 13
β”œβ”€β”€ 15
β”‚Β Β  └── shuffle_0_1_0.data
β”œβ”€β”€ 1b
β”‚Β Β  └── shuffle_0_7_0.data
β”œβ”€β”€ 1d
β”‚Β Β  └── shuffle_0_9_0.data
β”œβ”€β”€ 27
β”‚Β Β  └── shuffle_0_5_0.data
β”œβ”€β”€ 29
β”‚Β Β  └── shuffle_0_3_0.data
β”œβ”€β”€ 30
β”‚Β Β  └── shuffle_0_0_0.index
β”œβ”€β”€ 32
β”‚Β Β  └── shuffle_0_2_0.index
β”œβ”€β”€ 35
β”‚Β Β  └── shuffle_0_5_0.index
β”œβ”€β”€ 36
β”‚Β Β  └── shuffle_0_2_0.data
β”œβ”€β”€ 37
β”‚Β Β  └── shuffle_0_7_0.index
β”œβ”€β”€ 38
β”‚Β Β  └── shuffle_0_4_0.data
β”œβ”€β”€ 39
β”‚Β Β  └── shuffle_0_9_0.index
└── 3a
    └── shuffle_0_6_0.data

22 directories, 20 files

The reason of that was the fact that 1 ShuffleMapTask is generated for each initial partition. And so even in standalone mode with a single executor. But it's possible that after the first mapping transformation, these initial partitions will change. During shuffle, the data is placed into shuffle files according to these new partitions and not the initial ones.

Effective writing is made by SortShuffleWriter. First, it creates appropriate (with ordering and aggregation) ExternalSorter instance. After, it appends all partition records to it. ExternalSorter adds these records to its memory buffer and eventually splits them on disk when the buffer is full (= spilling).

Once all records processed, writer prepares 2 temporary files: one with data and the second with partitions lengths. The task of saving data goes to ExternalSorter's writePartitionedFile(BlockId, File) method. Inside it, ExternalSorter checks first if some spilling occurred. If yes, it merges separated files here:

// We must perform merge-sort; get an iterator by partition and write everything directly.
for ((id, elements) <- this.partitionedIterator) {
  // ...
}

Otherwise ExternalSorter reads all partitions that it holds and writes their content to disk. The writing is made by blocks. Each block represents 1 partition's data. At the end of writing, ExternalSorter appends the length of block to an array used to construct final .index file. The same operation is made in the case of merging spilled files.

The result of writePartitionedFile(...) is an array containing the length of each partition. For example, if we have 3 partitions, each of length 50, the result will be [50, 50, 50]. The indices of the array correspond to the partitions numbers.

After, the final .index file is generated thanks to that code:

// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
  offset += length
  out.writeLong(offset)
}

For our 50-length partitions we should receive a file looking like: "0,50,100,150", where 0 represents the beginning and 50 the end for the 1st partition, 50 the beginning and 100 the end for the 2nd partition, and finally 100 the beginning and 150 the end of the 3rd partition.

From the reducer's reading side it's less complicated. ShuffleBlockFetcherIterator is in charge of fetching records from files. It's also responsible for final sorting. If records need to be sorted, it will create, after reading all records of 1 partition, an instance of ExternalSorter and sort records with defined ordering.

This complementary note shows how shuffle writing and reading process occurs in Spark. It explains the main classes participating in this process and generated output files.