What's new in Apache Spark 3.0 - shuffle service changes

Versions: Apache Spark 3.0.0

One of Apache Spark's components making it hard to scale is shuffle. Fortunately, the community is on a good way to overcome this limitation and the new release of the framework brings some important improvements on this field.

I will start this blog post by recalling some basics about external shuffle service and explaining why it's a big impediment to implement easily scalable pipelines with modern containerized orchestrators like Kubenretes. In the following 2 sections, I will describe two important evolutions, including a partial one, implemented in Apache Spark 3.0.

External shuffle service recall

To recall, the external shuffle service is a process running on the same nodes as executors, responsible for storing the files generated by shuffle stages to enable executors deallocation when they're not needed anymore. This deallocation occurs when the Dynamic Resource Allocation feature is enabled, for example when a given executor doesn't process any new task during some specific period. Thanks to the external shuffle service process, the resources manager can deallocate the executor and use external shuffle service to retrieve shuffle data. This service is currently implemented for Mesos, YARN and Standalone modes.

Unfortunately, it's not so rosy. External shuffle service has some important drawbacks. First, it's not well adapted to the new workloads built on top of containerized environments like Kubernetes where the workflows are supposed to be isolated stateless processes and the current external shuffle service implementation breaks this isolation since the disk is always shared by the service and executor, both running at the same worker node.

Also, this implementation logic makes the scaling of the pipelines much harder because you cannot simply deallocate the whole node, even if it only serves for the shuffle service and remaining resources are idle (= it was sized for compute).

Finally, Kubernetes resource manager didn't have the external shuffle service implementation so it wasn't possible to use the already mentioned Dynamic Resource Allocation. The community addressed these major issues in 2 different stories, one for the remote storage for the shuffle files and another for the shuffle files tracking.

Remote storage for shuffle files

The first evolution of Apache Spark 3.0 related to the shuffle service is called Use remote storage for persisting shuffle data. The goal of this feature is to provider a pluggable interface to read and write shuffle files and despite the fact of targeting 3.1.0 release for the final availability, some of changes already were implemented in the 3.0.0.

But before we see the code, a few words of the expectations. The new API should make the following 4 success criteria possible:

All the above points come from the community effort made for extending shuffle service and I already planned to write a blog post to cover them more in detail, after the end of Apache Spark 3.0 series. Meantime, let's move on.

The community planned to implement this remote storage implementation in 5 different parts: driver lifecycle, executor lifecycle, shuffle writer, shuffler reader, and shuffle locations metadata. The goal of this refactoring is to propose a pluggable component, so technically speaking, expose the shuffle behavior from an interface, adaptable to one of the 4 behaviors.

As I mentioned, the effort already started and that's why in the codebase you will find a ShuffleDataIO interface that currently exposes executor() and driver() methods responsible for, respectively, data and metadata storage modules on the executor and the driver. At this occasion, a new configuration entry called spark.shuffle.sort.io.plugin.class was added to give a possibility to use the shuffle strategy corresponding to the user's need. The 3.0 release contains only the strategy for the local disk storage (LocalDiskShuffleDataIO).

Regarding the components created on the driver and executor, they're represented by, respectively, ShuffleDriverComponents and ShuffleExecutorComponents implementations. At this occasion, we retrieve the possibility to extend the shuffle writing by an extra action. Let's just take a look at the API's of the latter class:

  /**
   * Called once per map task to create a writer that will be responsible for persisting all the
   * partitioned bytes written by that map task.
   * ...
   */
  ShuffleMapOutputWriter createMapOutputWriter(
      int shuffleId,
      long mapTaskId,
      int numPartitions) throws IOException;

  /**
   * An optional extension for creating a map output writer that can optimize the transfer of a
   * single partition file, as the entire result of a map task, to the backing store.
   * 

* Most implementations should return the default {@link Optional#empty()} to indicate that * they do not support this optimization. This primarily is for backwards-compatibility in * preserving an optimization in the local disk shuffle storage implementation. * ... */ default Optional createSingleFileMapOutputWriter( int shuffleId, long mapId) throws IOException { return Optional.empty(); }

As you can see, the first method creates an implementation of ShuffleMapOutputWriter (also new in 3.0) that will be responsible for persisting the output of the map tasks and committing all the writes as a single atomic operation. The commit is used to indicate the availability of the map files for the reduce task. The second method, createSingleFileMapOutputWriter, creates an optional optimization writer that for the single current implementation (local disk) will write the shuffle data block alongside an index file that will store the offsets for all partitions.

For the driver's lifecycle (ShuffleDriverComponents), the goal is mostly to orchestrate the shuffling. Its first method, initializeApplication(), initializes the shuffle part and returns an extra configuration to the SparkContext:

/*
 * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
 * the RDD is transient since we don't need it on the executor side.
 * ...
 */
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]
// ...
    _shuffleDriverComponents = ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
    _shuffleDriverComponents.initializeApplication().asScala.foreach { case (k, v) =>
      _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
    }

The method is intended to prepare the shuffle components for the physical use. The preparation can be, for example, the creation of the appropriate writing context (directories on the file system, tables on a database, ... - remote shuffle storage use case) or registering for external servers (eg. external shuffle server outside the executors). All shuffle state and setup can be later cleaned with cleanupApplication. That's the first part related to the shuffle component itself. Another part managed by the driver is about shuffle stages. Every time a shuffle is created, it has an associated id. Starting from Apache Spark 3.0, you can add a behavior to register this id:

// ShuffleDependency
_rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)

// ShuffleDriverComponents
default void registerShuffle(int shuffleId) {}

The last function of the ShuffleDriverComponents is removeShuffle(int shuffleId, boolean blocking) that will remove all data associated to the given shuffle id.

Shuffle cleanup

How does Apache Spark know that shuffle files can be removed? When the shuffle stage is created, the framework registers it for cleanup (_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) in ShuffleDependency):

  private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
    referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
  }

Internally, the registered class is an instance of Java's WeakReference that first makes the class no longer accessible (= clears weak reference) and later puts the reference of the class to the associated queue from where Apache Spark's cleanup background thread reads what should be cleaned up.

Weak reference in JVM is the opposite of strong reference, so the situation when an instance of a particular class is used by other classes, like with this snippet myClass.fieldA = new FieldAClass(). Once we change the myClass.fieldA = null, Garbage Collector can consider the previously initialized FieldAClass as unreachable, so it can be garbage collected and removed from the memory.

As you can see then, shuffle read and metadata location are not implemented yet but the already implemented features show pretty well what we should be able to do with the shuffle storage starting from, let's hope, Apache Spark 3.1.0 🤞.

Shuffle files tracking

The second issue addressed in Apache Spark 3.0 is related to the Dynamic Resource Allocation feature which is one of the enablers for the auto-scaling in the framework. To use this feature with the previous versions, we had to install the external shuffle service but it was not obvious to do for Kubernetes resource manager. To overcome the issue, the community decided to work on shuffle files tracking to control when a given executor can be reclaimed during the scaling operation.

To enable this possibility, 2 new configuration entries were added. The first of them, spark.dynamicAllocation.shuffleTracking.enabled, activates or deactivates the shuffle tracking feature. The second one, spark.dynamicAllocation.shuffleTracking.timeout, specifies the TTL of the shuffle files. In other words, how long an executor can remain in the used state. After this delay, it becomes deallocatable, even though it still stores some used shuffle data. You can see that in this snippet. An important point, that is not easy to catch up immediately, is that shuffleIds will be null where the shuffle tracking is disabled:

    def updateTimeout(): Unit = {
      val oldDeadline = timeoutAt
      val newDeadline = if (idleStart >= 0) {
        val timeout = if (cachedBlocks.nonEmpty || (shuffleIds != null && shuffleIds.nonEmpty)) {
          val _cacheTimeout = if (cachedBlocks.nonEmpty) storageTimeoutNs else Long.MaxValue
          val _shuffleTimeout = if (shuffleIds != null && shuffleIds.nonEmpty) {
            shuffleTimeoutNs
          } else {
            Long.MaxValue
          }
          math.min(_cacheTimeout, _shuffleTimeout)
        } else {
          idleTimeoutNs
        }
        val deadline = idleStart + timeout
        if (deadline >= 0) deadline else Long.MaxValue
      } else {
        Long.MaxValue
      }

      timeoutAt = newDeadline

Regarding the tracking itself, the shuffleTracking flag is used in job- and task-related methods of ExecutorMonitor which is an instance of SparkListener class:

  override def onJobStart(event: SparkListenerJobStart): Unit = {
    if (!shuffleTrackingEnabled) {
      return
    }
   // ...
  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
    if (!shuffleTrackingEnabled) {
      return
    }
  // ...
 override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
  // ... 
      if (shuffleTrackingEnabled && event.reason == Success) {

When the job starts, ExecutorMonitor retrieves the shuffle stages of the active job and puts them into the state tracking variables:

// shuffle and the associated active jobs
private val shuffleToActiveJobs = new mutable.HashMap[Int, mutable.ArrayBuffer[Int]]()

// stage and associated shuffle ids
private val stageToShuffleID = new mutable.HashMap[Int, Int]()

// job and associated stage ids
private val jobToStageIDs = new mutable.HashMap[Int, Seq[Int]]()

The variables are cleaned up at the job's termination:


shuffleToActiveJobs.foreach { case (shuffleId, jobs) =>
      jobs -= event.jobId
// ...
}
    jobToStageIDs.remove(event.jobId).foreach { stages =>
      stages.foreach { id => stageToShuffleID -= id }
    }

And what is the link with the executor deallocation? When the executor doesn't have any pending shuffle references, its Tracker's field called hasActiveShuffle is set to false. Later, this flag is used by the timedOutExecutors() of ExecutorMonitor to define whether the executor can be removed or not:

      timedOutExecs = executors.asScala
        .filter { case (_, exec) => !exec.pendingRemoval && !exec.hasActiveShuffle }
        .filter { case (_, exec) =>
          val deadline = exec.timeoutAt
          if (deadline > now) {
            newNextTimeout = math.min(newNextTimeout, deadline)
            exec.timedOut = false
            false
          } else {
            exec.timedOut = true
            true
          }
        }
        .keys
        .toSeq
      updateNextTimeout(newNextTimeout)
    }
    timedOutExecs

This method is called by org.apache.spark.ExecutorAllocationManager and the retrieved information is later used to remove expired executors:

  private def schedule(): Unit = synchronized {
    val executorIdsToBeRemoved = executorMonitor.timedOutExecutors()
    if (executorIdsToBeRemoved.nonEmpty) {
      initializing = false
    }

    // Update executor target number only after initializing flag is unset
    updateAndSyncNumExecutorsTarget(clock.nanoTime())
    if (executorIdsToBeRemoved.nonEmpty) {
      removeExecutors(executorIdsToBeRemoved)
    }
  }

As you can see then, there is an ongoing effort to make Apache Spark more scalable and get rid - or at least, decrease - the scaling limitation related to the shuffle files. Finger-crossed that it will help to implement a very fascinating feature called Serverless Spark in the Cloud 🤩