Apache Spark 2.4.0 features - barrier execution mode

Versions: Apache Spark 2.4.0 https://github.com/bartosz25/spark-...om/waitingforcode/BarrierTest.scala

Data-driven systems continuously change. We moved from static, batch-oriented daily processing jobs to real-time streaming-based pipelines running all the time. Nowadays, the workflows have more and more AI compontents. Apache Spark tries to stay in the movement and in the new release proposes the implementation of the barrier execution mode as a new way to schedule tasks.

The post proposes an overview of this feature added in recent 2.4.0 release. The first part explains the motivations behind this new execution mode known under the operational code "Project Hydrogen". The second one shows the concept of Gang scheduling implemented with the Barrier Execution Mode. The third part gives some use case examples while the last focuses on some of implementation details.

Barrier Execution Mode and Project Hydrogen

First and foremost, let's recall some basics about the tasks in Apache Spark. The pipeline executed by the framework is composed of jobs which in its turn are built of stages. Each stage has also a smaller unit of work called task. And it's this smallest unit of work which is impacted by the barrier execution mode. How? We'll see it further in this post.

To understand the new execution mode, we need to discover another definition, the Project Hydrogen. This initiative was officially announced in Summer 2018 and tends to create one common platform for doing data science. Even though Apache Spark proposes already an interesting module with several Machine Learning algorithms, it's not adapted to work with other data science approaches as for instance Deep Learning proposed in the frameworks like TensorFlow or Keras. Therefore, the goal of Project Hydrogen is to bring all this world together into Apache Spark and let the users run not only built-in ML algorithms but also the ones coming from the external tools.

But in what Deep Learning (DL) is so special regarding Apache Spark's execution mode? In fact, it changes the way of thinking about the tasks. Actually, when one of the tasks fails, the framework restarts it and tries to complete the computation. But it's not valid for DL computation where the tasks are dependent on each other. Because of that one failed task cannot be restarted without other tasks - all must be recomputed once again. And it's the feature brought by Barrier Execution Mode.

Gang scheduling

Barrier Execution Mode is an example of the implementation of gang scheduling. It addresses the main issue of the original Apache Spark scheduling, namely it ensures that either all tasks are scheduled or none of them. The name of this approach comes from the fact that all tasks are "members" of a gang because of their parallel execution requirement.

Aside from all-or-nothing guarantee, gang scheduling brings the concept of synchronized communication. The tasks are able to communicate directly and therefore, get the response in synchronous manner.

The gang scheduling algorithm is often represented as an Ousterhout matrix. The rows denote the execution time slots whereas the columns contain each of the processors. The group of processors within a row composes the famous gang:

TimeExecutor1Executor2Executor3
t1task1task2task3
t2task4task5task6
t3task7task8task9

Barrier Execution Mode example

To use the Barrier Execution Mode we must invoke new RDD's method - barrier(). It returns an instance of RDDBarrier which currently supports only mapPartitions transformation. To see that all tasks start at the same time (round in the scheduler), let's use a cluster much smaller than the number of tasks to process:

  it should "get stuck because of insufficient resources" in {
    this.sparkContext.stop()
    val conf = new SparkConf().setAppName("Barrier Execution Mode with insufficient resources").setMaster("local[1]")
      .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "1")
    val sparkContext:SparkContext = SparkContext.getOrCreate(conf)
    val numbersRdd = sparkContext.parallelize(1 to 100).repartition(3)

    val barrierException = intercept[SparkException] {
      val mappedNumbers: RDDBarrier[Int] = numbersRdd.map(number => {
        number
      }).barrier()

      mappedNumbers.mapPartitions(numbers => {
        println(s"All numbers are ${numbers.mkString(", ")}")
        Iterator("a", "b")
      }).collect()
    }

    barrierException.getMessage shouldEqual "[SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires more slots than the total number of slots in the cluster currently. Please init a new cluster with more CPU cores or repartition the input RDD(s) to reduce the number of slots required to run this barrier stage."
  }

As you can note, the stage doesn't start. The problem disappears when we create a cluster with sufficient number of resources to launch all tasks in parallel:

  it should "run all tasks in parallel when there are enough resources" in {
    this.sparkContext.stop()
    val conf = new SparkConf().setAppName("Barrier Execution Mode with sufficent resources").setMaster("local[3]")
    val sparkContext:SparkContext = SparkContext.getOrCreate(conf)
    val numbersRdd = sparkContext.parallelize(1 to 100).repartition(3)

    val mappedNumbers: RDDBarrier[Int] = numbersRdd.map(number => {
      number
    }).barrier()

    val collectedNumbers = mappedNumbers.mapPartitions(numbers => {
      numbers
    }).collect()

    collectedNumbers should have size 100
    collectedNumbers should contain allElementsOf (1 to 100)
  }

Another important property of the Barrier Execution Mode is all-or-nothing semantic. As told, when one task fails, other tasks fail too:

   it should "restart all stages together for ShuffleMapStage" in {
    val numbersRdd = sparkContext.parallelize(1 to 10, 3)
    val mappedNumbers = numbersRdd.filter(number => {
      number % 2 == 0
    }).groupBy(number => {
      // Please notice: the retry applies only for ShuffleMapStage
      // For ResultStage it fails because:
      // Abort the failed result stage since we may have committed output for some partitions.
      if (number == 5 && !FailureFlags.wasFailed) {
        FailureFlags.wasFailed = true
        throw new SparkException("Expected failure")
      }
      number
    }).barrier()

    val collectedNumbers = mappedNumbers.mapPartitions(numbers => {
      val context = BarrierTaskContext.get()
      println(s"context=${context.taskAttemptId()} / ${context.stageAttemptNumber()}")
      context.barrier()
      numbers
    }).collect()

    collectedNumbers should have size 5
  }

But as you can notice in the comment, the automatic retry doesn't apply for all kind of tasks:

   it should "not restart the tasks for result stage" in {
    createSparkContext("Barrier Execution Mode ResultStage failure", "local[3, 100]")
    val numbersRdd = sparkContext.parallelize(1 to 10, 3)

    val error = intercept[SparkException] {
      val mappedNumbers = numbersRdd.filter(number => {
        if (number == 5 && !FailureFlags.wasFailed) {
          FailureFlags.wasFailed = true
          throw new SparkException("Expected failure")
        }
        number % 2 == 0
      }).barrier()

      mappedNumbers.mapPartitions(numbers => {
        val context = BarrierTaskContext.get()
        context.barrier()
        numbers
      }).collect()
    }

    error.getMessage should include("Job aborted due to stage failure: Could not recover from a failed barrier " +
      "ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask")
  }

The code we've written so far works perfectly fine but it's missing some important property - the synchronization barrier. The .barrier() method ensures that all tasks start together but do not ensure the all stops at the same time before beginning the next processing step:

  "RDDBarrier without BarrierTaskContext" should "synchronize tasks" in {
    val numbersRdd = sparkContext.parallelize(1 to 10).repartition(3)

    val mappedNumbers = numbersRdd.map(number => {
      number
    }).barrier()

    mappedNumbers.mapPartitions(numbers => {
      val sleepingTime = ThreadLocalRandom.current().nextLong(1000L, 10000L)
      println(s"Sleeping ${sleepingTime} ms")
      Thread.sleep(sleepingTime)
      TimesContainer.addIdentityMappedNumbers(new Date().toString)
      numbers
    }).collect()

    TimesContainer.IdentityMappedNumbers should have size 3
    TimesContainer.IdentityMappedNumbers.distinct should have size 3
  }

In order to introduce a synchronization in the middle of task execution we can use a global barrier operation brought by BarrierTaskContext. It blocks the execution of the current task as long as other tasks of given stage don't reach the context's barrier:

  "RDDBarrier with BarrierTaskContext" should "synchronize tasks" in {
    val numbersRdd = sparkContext.parallelize(1 to 10).repartition(3)

    val mappedNumbers = numbersRdd.map(number => {
      number
    }).barrier()

    mappedNumbers.mapPartitions(numbers => {
      val sleepingTime = ThreadLocalRandom.current().nextLong(1000L, 10000L)
      println(s"Sleeping ${sleepingTime} ms")
      Thread.sleep(sleepingTime)
      val context = BarrierTaskContext.get()
      context.barrier()
      TimesContainer.addIdentityMappedNumbers(new Date().toString)
      numbers
    }).collect()

    TimesContainer.IdentityMappedNumbers should have size 3
    TimesContainer.IdentityMappedNumbers.distinct should have size 1
  }

Barrier Execution Mode implementation

The previous section gave some input about the changes made to support the Barrier Execution Mode. But the .barrier() methods and BarrierTaskContext aren't the single changes. Another class added and not presented in previous section is BarrierCoordinator. When BarrierTaskContext's barrier() method is invoked, under-the-hood it uses BarrierCoordinator to send a message to the synchronization endpoint. The message is represented by RequestToSync(numTasks: Int, stageId: Int, stageAttemptId: Int, taskAttemptId: Long, barrierEpoch: Int). The receiver checks whether all tasks reached the barrier and if it's the case, it sends a response to to them:

        if (maybeFinishAllRequesters(requesters, numTasks)) {
          // Finished current barrier() call successfully, clean up ContextBarrierState and
          // increase the barrier epoch.
          logInfo(s"Barrier sync epoch $barrierEpoch from $barrierId received all updates from " +
            s"tasks, finished successfully.")
          barrierEpoch += 1
          requesters.clear()
          cancelTimerTask()
        }

    private def maybeFinishAllRequesters(
        requesters: ArrayBuffer[RpcCallContext],
        numTasks: Int): Boolean = {
      if (requesters.size == numTasks) {
        requesters.foreach(_.reply(()))
        true
      } else {
        false
      }
    }

It works because the BarrierTaskContext's send the message in blocking, i.e. synchronous, manner. It means that it waits a specified period of time (31536000 seconds = 365 days) before failing.

Among another important changes we can distinguish the ones introduced in the scheduler. As shown in the previous section, the Barrier Execution Mode won't schedule any task if there is no enough resources to do so. It's controlled by TaskSchedulerImpl here:

// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
for (taskSet <- sortedTaskSets) {
  // Skip the barrier taskSet if the available slots are less than the number of pending tasks.
  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
    // Skip the launch process.
    // TODO SPARK-24819 If the job requires more slots than available (both busy and free
    // slots), fail the job on submit.
    logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
      s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " +
      s"number of available slots is $availableSlots.")
  } else {

Another important class involved in the all-or-nothing semantic is DAGScheduler which in case of failure clears all accumulated results:

if (mapStage.rdd.isBarrier()) {
// Mark all the map as broken in the map stage, to ensure retry all the tasks on
// resubmitted stage attempt.
mapOutputTracker.unregisterAllMapOutput(shuffleId)
} else if (mapId != -1) {
// Mark the map whose fetch failed as broken in the map stage
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
}

At this occasion it's important to mention that not all pipelines can be executed with the new mode. We can find all possible exceptions by analyzing the children classes of BarrierJobAllocationFailed exception. We can find among them:

The Barrier Execution Mode is one of the first big steps of Apache Spark towards the AI pipelines. As explained throughout the post, this scheduling model ensures that all tasks of given stage are highly coupled. It means the all-or-nothing semantic when the failure of one task creates the failure of all other ones. And this semantic goes even further and requires all tasks to be launched at the same time. Thanks to new objects as BarrierTaskContext we can add an extra synchronization layer where none of tasks continues as long as all don't reach the barrier.