Probably the most popular configuration entry related to the shuffle is the number of shuffle partitions. But it's not the only one and you will see it in this new blog post series!
Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote
one on that topic! You can read it online
on the O'Reilly platform,
or get a print copy on Amazon.
I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩
Buffering and spilling
You'll find the first family of properties mostly involved in the writing process because it concerns buffering and spilling. To start, the spark.shuffle.spill.numElementsForceSpillThreshold property. Normally, spilling occurs when the shuffle writer cannot acquire more memory to buffer shuffle data. But this behavior can be also based on the number of the elements added to the buffer and the numElementsForceSpillThreshold property controls that. By default, it's equal to Integer.MAX_VALUE.
The second property involved in spilling is spark.shuffle.spill.batchSize. Once the shuffle mechanism decided to spill the data on disk, it won't write each record individually. Instead, it'll iterate the spillable iterator and flush the in-memory data in batches of the size defined in the batchSize property. By default, its value is 10 000 and changing it would lead to less or more disk hits:
class ExternalAppendOnlyMap[K, V, C]( // ... private val serializerBatchSize = sparkConf.get(config.SHUFFLE_SPILL_BATCH_SIZE) private[this] def spillMemoryIteratorToDisk(inMemoryIterator: Iterator[(K, C)]) : DiskMapIterator = { // ... try { while (inMemoryIterator.hasNext) { val kv = inMemoryIterator.next() writer.write(kv._1, kv._2) objectsWritten += 1 if (objectsWritten == serializerBatchSize) { flush() } }
The shuffle also uses the buffers to accumulate the data in-memory before writing it to disk. This behavior, depending on the place, can be configured with one of the following 3 properties:
- spark.shuffle.file.buffer is used to buffer data for the spill files. Under-the-hood, shuffle writers pass the property to BlockManager#getDiskWriter that returns a new DiskBlockObjectWriter instance. This instance uses the buffer size in the underlying BufferedOutputStream flushing data each time the size of buffered elements is greater than the configured buffer:
private[spark] class BlockManager // ... def getDiskWriter( blockId: BlockId, file: File, serializerInstance: SerializerInstance, bufferSize: Int, writeMetrics: ShuffleWriteMetricsReporter): DiskBlockObjectWriter = { val syncWrites = conf.get(config.SHUFFLE_SYNC) new DiskBlockObjectWriter(file, serializerManager, serializerInstance, bufferSize, syncWrites, writeMetrics, blockId) } private[spark] class DiskBlockObjectWriter // ... private def initialize(): Unit = { // ... class ManualCloseBufferedOutputStream extends BufferedOutputStream(if (checksumEnabled) checksumOutputStream else ts, bufferSize) with ManualCloseOutputStream mcs = new ManualCloseBufferedOutputStream
- spark.shuffle.spill.diskWriteBufferSize is present in the UnsafeShuffleWriter shuffle writing path. This shuffle writer uses ShuffleExternalSorter to generate spill files. Unlike 2 other writers, it can't use the DiskBlockObjectWriter directly because the data is backed by raw memory instead of Java objects and the sorter must use an intermediary array to transfer data from managed memory:
final class ShuffleExternalSorter extends MemoryConsumer implements ShuffleChecksumSupport { /** The buffer size to use when writing spills using DiskBlockObjectWriter */ private final int fileBufferSizeBytes; /** The buffer size to use when writing the sorted records to an on-disk file */ private final int diskWriteBufferSize; ShuffleExternalSorter( TaskMemoryManager memoryManager, BlockManager blockManager, TaskContext taskContext, int initialSize, int numPartitions, SparkConf conf, ShuffleWriteMetricsReporter writeMetrics) throws SparkException { //... this.fileBufferSizeBytes = (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024; this.diskWriteBufferSize = (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE()); // ... } private void writeSortedFile(boolean isLastFile) { // ... // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to // be an API to directly transfer bytes from managed memory to the disk writer, we buffer // data through a byte array. This array does not need to be large enough to hold a single // record; final byte[] writeBuffer = new byte[diskWriteBufferSize]; final Tuple2<TempShuffleBlockId, File> spilledFileInfo = blockManager.diskBlockManager().createTempShuffleBlock(); final File file = spilledFileInfo._2(); final TempShuffleBlockId blockId = spilledFileInfo._1(); try (DiskBlockObjectWriter writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse)) { // … int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage); long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); Platform.copyMemory( recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); writer.write(writeBuffer, 0, toTransfer); recordReadPosition += toTransfer; dataRemaining -= toTransfer; }
- spark.shuffle.unsafe.file.output.buffer defines the buffer size in the LocalDiskShuffleMapOutputWriter class. This class generates the final shuffle output, so it merges the spills and outputs an index file. The buffer property is used in the BufferedOutputStream supporting the write operations:
public class LocalDiskShuffleMapOutputWriter implements ShuffleMapOutputWriter { public LocalDiskShuffleMapOutputWriter( int shuffleId, long mapId, int numPartitions, IndexShuffleBlockResolver blockResolver, SparkConf sparkConf) { // ... this.bufferSize = (int) (long) sparkConf.get( package$.MODULE$.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE()) * 1024; } private void initStream() throws IOException { // .. outputBufferedFileStream = new BufferedOutputStream(outputFileStream, bufferSize); } private class PartitionWriterStream extends OutputStream { private final int partitionId; private long count = 0; private boolean isClosed = false; @Override public void write(int b) throws IOException { verifyNotClosed(); outputBufferedFileStream.write(b); count++; } @Override public void write(byte[] buf, int pos, int length) throws IOException { verifyNotClosed(); outputBufferedFileStream.write(buf, pos, length); count += length; }
Among other properties in this category, you will find a spark.shuffle.sort.initialBufferSize that defines the initial size of the array used by ShuffleInMemorySorter in the UnsafeShuffleWriter path. The value is only the size of the initial array because it might grow if there is enough memory available. Otherwise, the spill will be called to free the memory.
Finally, there is also a spark.shuffle.spill.initialMemoryThreshold. It defaults to 5MB and means that the Spillable implementation will only track memory usage starting from that size. It's not used in the UnsafeShuffleWriter, though.
Data compression
The second family of properties applies to the compression. To start, the spark.shuffle.compress enables or disables the compression for the shuffle output. The codec used to compress the files will be the same as the one defined in the spark.io.compression.codec configuration. Spill files use the same codec configuration but must be enabled with spark.shuffle.spill.compress. How does Apache Spark know which one to use? It simply checks the id of the written block file:
private[spark] class SerializerManager( // ... ) { private[this] val compressBroadcast = conf.get(config.BROADCAST_COMPRESS) private[this] val compressShuffle = conf.get(config.SHUFFLE_COMPRESS) private[this] val compressRdds = conf.get(config.RDD_COMPRESS) private[this] val compressShuffleSpill = conf.get(config.SHUFFLE_SPILL_COMPRESS) private def shouldCompress(blockId: BlockId): Boolean = { blockId match { case _: ShuffleBlockId => compressShuffle case _: ShuffleBlockChunkId => compressShuffle case _: BroadcastBlockId => compressBroadcast case _: RDDBlockId => compressRdds case _: TempLocalBlockId => compressShuffleSpill case _: TempShuffleBlockId => compressShuffle case _: ShuffleBlockBatchId => compressShuffle case _ => false } }
When it comes to other properties, spark.shuffle.mapStatus.compression.codec defines the compression codec for the MapStatus data, so the information for the shuffle files generated on the mapper side:
/** * Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the * task has shuffle files stored on as well as the sizes of outputs for each reducer, for passing * on to the reduce tasks. */ private[spark] sealed trait MapStatus extends ShuffleOutputStatus { /** Location where this task output is. */ def location: BlockManagerId def updateLocation(newLoc: BlockManagerId): Unit /** * Estimated size for the reduce block, in bytes. * * If a block is non-empty, then this method MUST return a non-zero size. This invariant is * necessary for correctness, since block fetchers are allowed to skip zero-size blocks. */ def getSizeForBlock(reduceId: Int): Long /** * The unique ID of this shuffle map task, if spark.shuffle.useOldFetchProtocol enabled we use * partitionId of the task or taskContext.taskAttemptId is used. */ def mapId: Long }
The MapStatus is only a trait and the used implementation depends on the spark.shuffle.minNumPartitionsToHighlyCompress. Depending on the number of shuffle partitions and this configuration property, Apache Spark will create a CompressedMapStatus or HighlyCompressedMapStatus. The former one is the exact representation of the shuffle output that in the past led to some fatal errors (SPARK-4909). The HighlyCompressedMapStatus addresses these issues by storing only the average size of blocks.
The size of the approximately recorded blocks is specified in spark.shuffle.accurateBlockThreshold. All blocks smaller than this value will have their exact value stored.
That's all for this 1 of 3 blog posts about shuffle configuration files. You saw here the properties responsible for spilling, buffering and compression. In the next part you'll the rest!
Consulting

With nearly 16 years of experience, including 8 as data engineer, I offer expert consulting to design and optimize scalable data solutions.
As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and
drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!
👉 contact@waitingforcode.com
đź”— past projects