Shuffle configuration demystified - part 3

Versions: Apache Spark 3.2.0

It's time for the last part of the shuffle configuration overview. This time you'll see the properties related to the shuffle service, reducer, I/O, and a few others.

A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free

๐Ÿ‘‰ Register here

Shuffle service

Shuffle service groups the first of the remaining categories. This component helps scale Apache Spark clusters by storing shuffle data outside the executors. But it's optional, and one of the first configuration entries you'll find is spark.shuffle.service.enabled to enable it. After turning it on, you'll have to set the name (spark.shuffle.service.name). An important point, the name must be the same as for the shuffle service in YARN (yarn.nodemanager.aux-services).

In addition to the name, the configuration also supports the port (spark.shuffle.service.port). The property is useful in the scenarios where the YARN cluster has different shuffle instances. The port helps pick the right one to use. Besides, the port is also used in the standalone mode and Mesos scheduler.

When it comes to the shuffle connection, there are 2 related properties. The spark.shuffle.registration.maxAttempts defines the maximal number of retries in case of failures. The second one is spark.shuffle.registration.timeout and it controls how long the shuffle client can wait to register to the external shuffle service.

In addition, shuffle service can cache the shuffle information stored in the shuffle index files. In Apache Spark 1.x, each fetch for the mapper's output involved opening the shuffle index file. Starting from Apache Spark, the executors serving the data cache this information, up to spark.shuffle.service.index.cache.size bytes.

To terminate this part, the standalone worker has spark.shuffle.service.db.enabled where it enables caching for the shuffle service files. In case of a restart, the executor will use this information to reload shuffle parameters.

Reducer

That was for the shuffle service. Let's now see the shuffle reading part.

The reducers have 3 properties to control the shuffle block fetch backpressure. Two of them are global:

In addition to them, the reducer also has a property called spark.reducer.maxBlocksInFlightPerAddress. It controls the number of concurrent fetch requests sent to a host. Each host can serve multiple reducer tasks, and this configuration helps not overload it with too many requests that could lead to a fatal crash.

Apart from the remote fetches, reducers can also load shuffle blocks directly from the local disk. The property responsible for enabling this behavior is spark.shuffle.readHostLocalDisk. It's turned on by default, so that the executor will read shuffle blocks stored on the node with the file system API. Otherwise, all shuffle fetch requests will go over the network.

I/O

The shuffle I/O configuration was a tricky one to find! On the one hand, it's explicitly listed in the documentation, but on another one, it's missing in the usual configuration Scala files. It turns out that this group is built dynamically by a class called TransportConf, referenced with the shuffle module from many places

private[deploy]
class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
  extends Logging {
 
  private val transportConf =
 SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
// ...
}

private[spark] class IndexShuffleBlockResolver(
    conf: SparkConf,
    // var for testing
    var _blockManager: BlockManager = null)
  extends ShuffleBlockResolver
  with Logging with MigratableResolver {

  private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
// ...
}

private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
  private[shuffle] def initiateBlockPush(
      dataFile: File,
      partitionLengths: Array[Long],
      dep: ShuffleDependency[_, _, _],
      mapIndex: Int): Unit = {
    val numPartitions = dep.partitioner.numPartitions
    val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
// ...

In this configuration you'll find the entries responsible for the communication over the network, for example during the shuffle reading from a shuffle service. The properties should be pretty clear since they operate at the request level:

Others

I don't like calling them "others" because they're as interesting and important as the above ones. However, I didn't find a better classification name due to the lack of common characteristics. Anyway.

To start, the spark.shuffle.sort.bypassMergeThreshold. It's the proprety you'll be likely to use to enable BypassMergeSortShuffleWriter. Shuffle handler will use the BypassMergeSortShuffleWriter if the value is lower or equal to the number of shuffle partitions. There is also a transformation-specific condition, but you'll discover it in a dedicated blog post to the BypassMergeSortShuffleWriter.

Next, you might also customize the sorter algorithm for the UnsafeShuffleWriter. By default, it'll use the Tim sort, but you can switch it to the Radix sort by enabling the spark.shuffle.sort.useRadixSort. The consequence of this move? Probably faster sorting before writing the shuffle data, but also more memory used.

The Radix algorithm is not the single configuration related to the UnsafeShuffleWriter. Another one is spark.shuffle.unsafe.fastMergeEnabled. What does it do? It's one of the properties conditioning a fast spill merge. Under-the-hood, this method will use NIO's transferTo method instead of slower FileStreams merge.

To write data on disk, shuffle writers use a DiskBlockObjectWriter instance. It's created by the block manager and uses another shuffle-related property called spark.shuffle.sync. If enabled, it ensures that the data buffered by the OS is immediately written to disk while committing shuffle file. Since it has some performance impact, it defaults to false. Interesting fact. The property is quite old (Apache Spark 0.8.0) but was added to measure the impact of the OS cache lack on the shuffle files generation.

Finally, there is a configuration property called spark.shuffle.sort.io.plugin.class to return the classes responsible for the shuffle data files storage. Even though by default it's set to LocalDiskShuffleDataIO to operate on the local disk, it's a part of the remote storage for shuffle files refactoring.

That's the last of the 3 blog posts dedicated to the shuffle configuration. I omitted the Adaptive Query Execution part and a few other properties. I will be happy to put it aside for a follow-up article if you have found something missing. Meantime, I have other topics in the backlog and see you next weeks for them!