Shuffle configuration demystified - part 1

Versions: Apache Spark 3.2.1

Probably the most popular configuration entry related to the shuffle is the number of shuffle partitions. But it's not the only one and you will see it in this new blog post series!

Buffering and spilling

You'll find the first family of properties mostly involved in the writing process because it concerns buffering and spilling. To start, the spark.shuffle.spill.numElementsForceSpillThreshold property. Normally, spilling occurs when the shuffle writer cannot acquire more memory to buffer shuffle data. But this behavior can be also based on the number of the elements added to the buffer and the numElementsForceSpillThreshold property controls that. By default, it's equal to Integer.MAX_VALUE.

The second property involved in spilling is spark.shuffle.spill.batchSize. Once the shuffle mechanism decided to spill the data on disk, it won't write each record individually. Instead, it'll iterate the spillable iterator and flush the in-memory data in batches of the size defined in the batchSize property. By default, its value is 10 000 and changing it would lead to less or more disk hits:

class ExternalAppendOnlyMap[K, V, C]( // ...

private val serializerBatchSize = sparkConf.get(config.SHUFFLE_SPILL_BATCH_SIZE)

  private[this] def spillMemoryIteratorToDisk(inMemoryIterator: Iterator[(K, C)])
      : DiskMapIterator = {
// ...
    try {
      while (inMemoryIterator.hasNext) {
        val kv = inMemoryIterator.next()
        writer.write(kv._1, kv._2)
        objectsWritten += 1

        if (objectsWritten == serializerBatchSize) {
          flush()
        }
      }

The shuffle also uses the buffers to accumulate the data in-memory before writing it to disk. This behavior, depending on the place, can be configured with one of the following 3 properties:

Among other properties in this category, you will find a spark.shuffle.sort.initialBufferSize that defines the initial size of the array used by ShuffleInMemorySorter in the UnsafeShuffleWriter path. The value is only the size of the initial array because it might grow if there is enough memory available. Otherwise, the spill will be called to free the memory.

Finally, there is also a spark.shuffle.spill.initialMemoryThreshold. It defaults to 5MB and means that the Spillable implementation will only track memory usage starting from that size. It's not used in the UnsafeShuffleWriter, though.

Data compression

The second family of properties applies to the compression. To start, the spark.shuffle.compress enables or disables the compression for the shuffle output. The codec used to compress the files will be the same as the one defined in the spark.io.compression.codec configuration. Spill files use the same codec configuration but must be enabled with spark.shuffle.spill.compress. How does Apache Spark know which one to use? It simply checks the id of the written block file:

private[spark] class SerializerManager(
// ...
) {
  private[this] val compressBroadcast = conf.get(config.BROADCAST_COMPRESS) 
  private[this] val compressShuffle = conf.get(config.SHUFFLE_COMPRESS) 
  private[this] val compressRdds = conf.get(config.RDD_COMPRESS) 
  private[this] val compressShuffleSpill = conf.get(config.SHUFFLE_SPILL_COMPRESS)

  private def shouldCompress(blockId: BlockId): Boolean = {
    blockId match {
      case _: ShuffleBlockId => compressShuffle
      case _: ShuffleBlockChunkId => compressShuffle
      case _: BroadcastBlockId => compressBroadcast
      case _: RDDBlockId => compressRdds
      case _: TempLocalBlockId => compressShuffleSpill
      case _: TempShuffleBlockId => compressShuffle
      case _: ShuffleBlockBatchId => compressShuffle
      case _ => false
    }
  }

When it comes to other properties, spark.shuffle.mapStatus.compression.codec defines the compression codec for the MapStatus data, so the information for the shuffle files generated on the mapper side:

/**
 * Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
 * task has shuffle files stored on as well as the sizes of outputs for each reducer, for passing
 * on to the reduce tasks.
 */
private[spark] sealed trait MapStatus extends ShuffleOutputStatus {
  /** Location where this task output is. */
  def location: BlockManagerId

  def updateLocation(newLoc: BlockManagerId): Unit

  /**
   * Estimated size for the reduce block, in bytes.
   *
   * If a block is non-empty, then this method MUST return a non-zero size.  This invariant is
   * necessary for correctness, since block fetchers are allowed to skip zero-size blocks.
   */
  def getSizeForBlock(reduceId: Int): Long

  /**
   * The unique ID of this shuffle map task, if spark.shuffle.useOldFetchProtocol enabled we use
   * partitionId of the task or taskContext.taskAttemptId is used.
   */
  def mapId: Long
}

The MapStatus is only a trait and the used implementation depends on the spark.shuffle.minNumPartitionsToHighlyCompress. Depending on the number of shuffle partitions and this configuration property, Apache Spark will create a CompressedMapStatus or HighlyCompressedMapStatus. The former one is the exact representation of the shuffle output that in the past led to some fatal errors (SPARK-4909). The HighlyCompressedMapStatus addresses these issues by storing only the average size of blocks.

The size of the approximately recorded blocks is specified in spark.shuffle.accurateBlockThreshold. All blocks smaller than this value will have their exact value stored.

That's all for this 1 of 3 blog posts about shuffle configuration files. You saw here the properties responsible for spilling, buffering and compression. In the next part you'll the rest!


If you liked it, you should read:

đź“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!