What's new in Apache Spark 3.1 - Kubernetes Generally Available!

Versions: Apache Spark 3.1.1

After several months spent as an "experimental" feature in Apache Spark, Kubernetes was officially promoted to a Generally Available scheduler in the 3.1 release! In this blog post, we'll discover the last changes made before this promotion.

Scaling

Dynamic Resource Allocation

Let's start with the changes impacting the Dynamic Resource Allocation. And to introduce the first one, I have to make a quick recall on the terms you will meet very often here. The first of them is snapshot represented by the ExecutorPodSnapshot class. To know what happens in the Kubernetes cluster, Apache Spark uses the listener pattern in the ExecutorPodWatcher:

  private class ExecutorPodsWatcher extends Watcher[Pod] {
    override def eventReceived(action: Action, pod: Pod): Unit = {
      val podName = pod.getMetadata.getName
      logDebug(s"Received executor pod update for pod named $podName, action $action")
      snapshotsStore.updatePod(pod)
    }

Whenever something changes, Apache Spark immediately updates the SnapshotStore and creates a new version of the ExecutorPodSnapshot, so a kind of picture of the cluster at a given moment:

  def withUpdate(updatedPod: Pod): ExecutorPodsSnapshot = {
    val newExecutorPods = executorPods ++ toStatesByExecutorId(Seq(updatedPod))
    new ExecutorPodsSnapshot(newExecutorPods, fullSnapshotTs)
  }

You can also think of the snapshot as a kind of message state passed to the subscribers. Marcelo Masiero Vanzin discovered that with many executor state changes, the information about the same executor could be logged multiple times in a short interval. Moreover, he also implemented the support for the alive executors that are no longer used and a protection against out-of-order snapshots processing by the subscribers.

Also from the category of concurrency updates, Holden Karau added a new configuration property called spark.kubernetes.allocation.executor.timeout. It's present in the ExecutorPodsAllocator class as a 2nd strategy to determine the executor pod creation timeout (= the executor can be created no longer than this interval). The previous timeout strategy didn't cover all cluster use cases like the clusters under a heavy load:

  private val podCreationTimeout = math.max(
    podAllocationDelay * 5,
    conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT))
  private def onNewSnapshots(
      applicationId: String,
      snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
    newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys)
// ...
    val currentTime = clock.getTimeMillis()
    val timedOut = newlyCreatedExecutors.flatMap { case (execId, (_, timeCreated)) =>
      if (currentTime - timeCreated > podCreationTimeout) {
        Some(execId)
      } else {
        logDebug(s"Executor with id $execId was not found in the Kubernetes cluster since it" +
          s" was created ${currentTime - timeCreated} milliseconds ago.")
        None
      }
    }
    if (timedOut.nonEmpty) {
      logWarning(s"Executors with ids ${timedOut.mkString(",")} were not detected in the" +
        s" Kubernetes cluster after $podCreationTimeout ms despite the fact that a previous" +
        " allocation attempt tried to create them. The executors may have been deleted but the" +
        " application missed the deletion event.")

Terminate this category of changes, Dongjoon Hyun added the support for spark.dynamicAllocation.executorIdleTimeout property in the idle executors detection.

Fallback storage

Dongjoon Hyun is also the author of fallback storage feature! I shortly introduced it in my previous blog post about node decommissioning. In a nutshell, the idea is to use alternative storage for the shuffle and RDD blocks if there are no executor peers left to accept replicated files before shutting down the node. Ideally, this fallback storage should have a native data expiration support (TTL), and a good candidate is cloud object storage. However, it's still possible to use something else as long as you can provide some clean-up mechanism or don't care about accumulating data.

To set the fallback storage use the spark.storage.decommission.fallbackStorage.path property.

Downscaling

A topic related to the aforementioned node decommissioning is downscaling, i.e., the situation when a pod goes away because the job doesn't need it anymore. I'd like to share an interesting bug fix added by Shiqi Sun and Holden Karau to start this category. But some background first. In Kubernetes, you can deploy your executor pods in a sidecar pattern meaning that the pod will run the executor and some other containers, like a logging system. Before Apache Spark 3.0.2 (change backported to this version), the cluster manager only checked the state of the pod. Because of that, a partially failed pod was in a "running" state. To fix this issue in the new version, you can set the spark.kubernetes.executor.checkAllContainers property to true.

Holden Karau also added a possibility to clean shuffle blocks files directly from the RDD's level. Even though it's tagged as "experimental" and "developer", the method is not new. It comes from Spark MLib's ALS algorithm. The change improves downscaling if the shuffle service - which is true for Kubernetes - is missing. Since the executor won't store any job-specific files, it can be freely decommissioned. To use it, call RDD's cleanShuffleDependencies(blocking: Boolean = false).

Another scaling change added by Holden Karau was the new definition for pod deletion detection. Previously a pod was considered as deleted when it had an associated deletion timestamp metadata attribute. However, this timestamp was also present for the pods in terminating state, meaning that Apache Spark was removing still alive executors. Holden extended the previous singleton condition by adding an extra verification on the pod status or status phase emptiness, or the phase not being equal to the "terminating" or "running":

  private def isDeleted(pod: Pod): Boolean = {
    (pod.getMetadata.getDeletionTimestamp != null &&
      (
        pod.getStatus == null ||
        pod.getStatus.getPhase == null ||
          (pod.getStatus.getPhase.toLowerCase(Locale.ROOT) != "terminating" &&
           pod.getStatus.getPhase.toLowerCase(Locale.ROOT) != "running")
      ))
  }

Currently, the condition on the "running" phase was added in another task fixing a risk of getting an inconsistent snapshot view for the pod state. Attila Zsolt Piros added another inconsistency fix. If you check the list of new configuration entries, you will notice a new property called spark.kubernetes.executor.missingPodDetectDelta. It defines an interval during which the executor known by the scheduler backend on Apache Spark's side can be missing in the snapshot information coming from Kubernetes. Passed that time, this executor will be reported as missing if it's still not found in the snapshot information. You should then see a debug message like "The executor with ID ... (registered at ...) was not found in the cluster at the polling time (... ms) which is after the accepted detect delta time…". In the previous version, such an executor was considered missing without this grace period.

Mounts

A few things also changed for the resources you can use in your Apache Spark on Kubernetes application. The first is the support for Network File System (NFS) volume mounts added by Seongjin Cho. In other words, you can now mount a pre-populated volume that can be shared between pods, and that won't be erased after the Pod terminates. It changes the previous workflow based on the Persistent Volume Claim (PVC) where you can use only empty storage, so you had to provision it before starting the job.

But it doesn't mean the PVCs are bad or not supported anymore. On the contrary! They also got some important changes in Apache Spark 3.1.1. To start, Dongjoon Hyun implemented the support for dynamic PVC creation and deletion for the driver and executors! The feature is very interesting for the workloads benefiting from the Dynamic Resource Allocation, where Apache Spark will create new executor pods with associated PVC. Before that, you should have to pre-populate the claims with executor numbers that are hard to predict exactly in dynamic workloads. Now Apache Spark not only takes care of that but also lets you use application id and executor id placeholders in the spark.executorEnv.SPARK_EXECUTOR_DIRS property. For example the configuration like that /efs/spark-apps/SPARK_APPLICATION_ID/SPARK_EXECUTOR_ID could create the directories like /efs/spark-apps/spark-app-1/1, /efs/spark-apps/spark-app-1/2, /efs/spark-apps/spark-app-2/1, ...

Stage-level scheduling

Regarding the next feature, I was unaware of it, but apparently, we could use it a long time before Kubernetes! Stage-level scheduling was already there for the YARN scheduler. In a nutshell, it lets you use different cluster resources for every stage of your pipeline. It means that you can use resources adapted to the workload type, like CPU and memory-intensive instances for ETL job, and GPU instances for your Deep Learning job. And guess what? Thanks to Thomas Graves, it's now available for Kubernetes! With a few caveats, though.

The feature requires Dynamic Resource Allocation enabled and since Spark on Kubernetes doesn't support external shuffle service, you will need to set the spark.dynamicAllocation.shuffleTracking.enabled to true. Also, you must be aware that due to this limitation and shuffle tracking files, you can use more cluster resources because the executors may never be deallocated due to the stored shuffle files. You can control this with the value in spark.dynamicAllocation.shuffleTracking.timeout but you will increase the risk of recomputing the shuffle files if needed.

How to use it? Call the RDD's withResources(rp: ResourceProfile) method and pass the resource profile specification. And the code for Kubernetes was already there because it implements the CoarseGrainedSchedulerBackend interface and its doRequestTotalExecutors. The difference is that previously the method was ignoring the custom resource profile and always used the default one. Starting from the 3.1.1 release, it looks like that:

  override def doRequestTotalExecutors(
      resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = {
    podAllocator.setTotalExpectedExecutors(resourceProfileToTotalExecs)
    Future.successful(true)
  }

Please also note that this stage-level scheduling feature added the lacking support for the off-heap memory request, so far ignored when Apache Spark was defining the executor's memory. As Luca Canali pointed out in SPARK-32661, the workaround was to put the off-heap memory size to the spark.executor.memoryOverhead. Hopefully, the off-heap support was in the scope of Thomas Graves' work, and he included it in the stage-level scheduling implementation.

Configuration changes

Apart from the resource-related changes, there are also some configuration evolutions. To start, the spark.kubernetes.executor.podNamePrefix became public. Previously it was annotated as an "internal" configuration. You can use it to add a prefix to the executor pod names.

Also, Artsiom Yudovin added the spark.kubernetes.authenticate.executor.serviceAccountName to define the service account associated with the executor. So far, you could only configure this property for the driver, but now, it's possible to dissociate both of them and use two different sets of permissions for the driver and executor identities.

Onur Satici, identified that driver cores are coupled to the container resources instead of the spark.driver.cores configuration property. Thanks to this evolution, you can now use it to define the parallelism and set the spark.kubernetes.driver.requests.cores to limit the resource requests of the driver.

Finally, Pedro Rossi added a support for the proxy user with --proxy-user, so far not working correctly on Kubernetes.

New storage level

The next feature is tagged "Spark Core" but is also present in the list of changes making Kubernetes GA. So you can use it everywhere! Thanks to the PR authored by Dongjoon Hyun, you can now define a DISK_ONLY_3 storage level! In other words, you can natively use the 3x disk replication. Technically it was possible before with a custom StorageLevel but having it supported natively gives extra security (unit test coverage, code readability).

However, it took me a while to understand the "why" of this evolution. Why wasn't the DISK_ONLY_2 level enough? What was so special in this DISK_ONLY_3? I found an answer in the PR's discussion. I invite you to read it since it's very insightful. In a nutshell, the DISK_ONLY_3 will be useful to avoid data recomputation for the jobs with long lineage and high risk of executor losses due to, for example, maintenance and preemption reasons. It also should improve the throughput since the data can be served from 3 places now.

Python support

You certainly noticed that already, the 3.1.1 release has a lot of improvements regarding Python. But it's not only due to the Project Zen initiative! Also, Kubernetes has some Python-related changes, and maybe the most important one is the removal of Python 2 from the PySpark Docker image made by William Hyun in SPARK-32882. After getting rid of the Python 2 dependencies, the built image lost almost 100MB!

Among other changes, Hyukjin Kwon added the support for PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON environment variables and Stavros Kontopoulos implemented the possibility to upload local dependencies with --py-files argument. Finally, Hyukjin Kwon also fixed the --archive argument on Kubernetes which previously didn't resolve the local path on the driver correctly.

As you can notice, Apache Spark on Kubernetes component improved a lot in this last pre-GA iteration. The community put a lot of effort into the scaling facility and the component extension with the support for extra mounts or stage-level scheduling.