Jobs, stages and tasks

Versions: Spark 2.1.0

Every distributed computation is divided in small parts called jobs, stages and tasks. It's useful to know them especially during monitoring because it helps to detect bottlenecks.

In this post we focus on mentioned 3 elements. The first section explains each of them theoretically. The second part shows classes involved in making jobs running in Spark cluster. The last section exposes jobs, stages and tasks through unit tests.

Definition

In a big picture, the execution plan is built as a Directed Acyclic Graph (DAG). After generation it's submitted to DAG scheduler that is responsible for distributing it to workers. The distribution occurs when DAG scheduler passes computed operations to task scheduler that physically launches received elements through cluster manager.

That was all for generalities. More in details, generated execution plan is composed of job, stage(s) and task(s). The simplest unit is a job that can be understood as all processing in driver program done up to given action. So for example in below code:

myRDD.isEmpty();

myRDD.map(x => y).saveAsTextFile();

we retrieve 2 jobs - the first one for isEmpty() and the second one for saveAsTextFile() one.

Each job in its side is composed of stage(s) submitted to execution by DAG scheduler. It's a set of operations (= tasks described later) working on the identical functions but applied on data subsets depending on partitions. Stages that are not interdependent can be submitted in parallel to improve the processing throughput. Each stage can be: shuffle map or result type. The first type represents stages those results are the inputs for next stages. In the other side, the result stage represents stages those results are sent to the driver (= results of Spark actions).

Each stage has task(s). It's the smallest unit of execution used to compute a new RDD. It's represented by the command sent by the driver to executor with serialized form of computation. More precisely, the command is represented as a closure with all methods and variables needed to make computation. It's important to remember that the variables are only the copies of objects declared in driver's program and that they're not shared among executors (i.e. each executor will operate on different object that can lead to different values). Tasks are executed on executors and their number depend on the number of partitions - 1 task is needed for 1 partition.

Abstractions described above concerns not only batch processing but also streaming (since it's micro-batching oriented) and SQL ones (since their finalities are RDDs).

Involved classes

Among classes involved in job scheduling in Spark, we can distinguish 2 scheduler-like objects. The first one is org.apache.spark.scheduler.DAGScheduler. Its role consists on construct the sequence of stages needed to execute the action defined through Spark API. DAGScheduler produces jobs represented internally by ActiveJob instances. One of its properties is called finalStage and is represented by Stage class instance. As the name indicates, Stage objects represent stages, thus they are composed of several parallel tasks. Similarly to previously quoted task types, also stages have the distinction between shuffle-map and result finality. The first type is represented by ShuffleMapStage. It's used for intermediate stages, i.e. the ones outputting inputs for subsequent stages. The second type of stages is ResultStage and as its name indicates, it's related to Spark's actions execution.

When a stage is executed, DAGScheduler checks if there are missing tasks. When it's the case, it generates them as a set of tasks, internally represented by the instances of TaskSet:

private[spark] class TaskSet(
    val tasks: Array[Task[_]],
    val stageId: Int,
    val stageAttemptId: Int,
    val priority: Int,
    val properties: Properties) {

TaskSet contains an array of tasks represented as org.apache.spark.scheduler.Task instances. Depending on stage's type, the tasks will be either shuffle-map (ShuffleMapTask) or result (ResultTask class). Each task object has associated partition on which it's executed as well as other information: metrics or state. Regarding to the state (org.apache.spark.TaskState), the task can be: launched, run, finished or failed (lost or killed).

TaskSet is submitted by DAGScheduler to other scheduler. This one is responsible for tasks and is called TaskScheduler. After receiving list of expected tasks, TaskScheduler sends them to the cluster and monitor their execution. Once executed, it reports events about execution - and more particularly about executors (add, lost, heartbeat) - to DAGScheduler.

Previously listed task states are monitored thanks by TaskScheduler with the help of TaskResultGetter. The instance of this class is associated to TaskScheduler and it's used by the last one to fetch the exact state of finished tasks. The "exact state" means here the result of task: successful execution or failure. Once exact and detailed state fetched, it's submitted, through TaskSetManager, to DAGScheduler. DAGScheduler handles this execution event in taskEnded(Task[_], TaskEndReason, Any, Seq[AccumulatorV2[_]], TaskInfo) method. The handling consists on sending event called CompletionEvent to event processing loop. The task result isn't sent directly from TaskScheduler to DAGScheduler because intermediate layer of TaskSetManager, created after that DAGScheduler sends TaskSet to TaskScheduler, is used to update completed task metadata (TaskInfo class).

In a big picture, below schema resumes this section:

Spark job workflow

Job, stages, task tests

Below tests shows some of specifics pointed in this post:

"2 jobs" should "be triggered where processing has 2 actions" in {
  val logAppender = createLogAppender(Seq("Starting job"))

  val numbers = sparkContext.parallelize(Seq(1, 2, 3, 4, 5, 6))

  // The 1st processing - from here to count() action
  val pairNumbersRdd = numbers.filter(_%2 == 0)
  val pairNumbersCount = numbers.filter(_%2 == 0).count()

  // The 2nd processing - only collect() action
  // It's only for learning purposes - obviously, this code
  // is redundant since we can deduct the count from
  // collected elements
  val allPairNumbers = pairNumbersRdd.collect()

  logAppender.messages.size shouldEqual(2)
  logAppender.getMessagesText should contain allOf("Starting job: count at JobTest.scala:23",
    "Starting job: collect at JobTest.scala:29")
}

"different number of stages" should "be created for 1 and 2 partitions" in {
  // This time we're expecing to detect log events showing
  // new task submission
  val logAppender = createLogAppender(Seq("Adding task set"))

  val numbers = sparkContext.parallelize(Seq(1, 2, 3, 4, 5, 6))

  // The 1st test - single partition
  val numbersQuantity = numbers.count()

  // The 2nd test - 2 partitions, thus automatically
  // twice more stages
  val numbers2Partitions = sparkContext.parallelize(Seq(1, 2, 3, 4, 5, 6), 2)
  val numbersQuantityFor2Partitions = numbers2Partitions.count()

  // Expectations - the first concerns numbers.count(), the second
  // numbers2Partitions.count()
  logAppender.getMessagesText should contain allOf("Adding task set 0.0 with 1 tasks",
    "Adding task set 1.0 with 2 tasks")
}

"failed task" should "be sent from TaskSetManager to DAGScheduler" in {
  val logAppender = createLogAppender(Seq("failed 1 times; aborting job", "Removed TaskSet 0.0",
    "After removal of stage"))

  val numbersRdd = sparkContext.parallelize(Seq(1, 2, 3, 4, 5, 6), 3)

  try {
    val numberOfElements = numbersRdd.filter(number => {
      if (number == 4) {
        throw new RuntimeException("4 is invalid number")
      }
      true
    }).count
  } catch {
    case _:Throwable =>
  }
  // Sleep is mandatory to catch DAGScheduler event log
  Thread.sleep(3000)

  logAppender.messages.size shouldEqual(3)
  logAppender.messages should contain allOf(
    LogMessage("Task 1 in stage 0.0 failed 1 times; aborting job", "org.apache.spark.scheduler.TaskSetManager"),
    LogMessage("Removed TaskSet 0.0, whose tasks have all completed, from pool ",
      "org.apache.spark.scheduler.TaskSchedulerImpl"),
    // For DAGScheduler other log message is more meaningful but it
    // contains execution time that can vary depending on execution environment, e.g.:
    // "ResultStage 0 (count at JobTest.scala:77) failed in 0.891 s due to Job aborted ...
    //  Driver stacktrace: (org.apache.spark.scheduler.DAGScheduler:54)"
    // It's why the test checks against the message occurring after it that is
    // stage removal.
    LogMessage("After removal of stage 0, remaining stages = 0", "org.apache.spark.scheduler.DAGScheduler")
    )
}

// Comes from other file but for brevity reasons, added in the same 
// code block - it's why the number of line on (count...) is small
"SQL processing" should "also be composed of job, tasks and stages" in {
  val logAppender = InMemoryLogAppender.createLogAppender(Seq("Starting job", "Final stage",
    "Running task 0.0"))
  val numbersDataFrame = sparkSession.range(1, 20)

  numbersDataFrame.count()

  logAppender.getMessagesText() should contain allOf("Starting job: count at JobTest.scala:29",
    "Final stage: ResultStage 1 (count at JobTest.scala:29)",
    "Running task 0.0 in stage 0.0 (TID 0)", "Running task 0.0 in stage 1.0 (TID 1)"
    )
}

private def createLogAppender(patterns:Seq[String]): InMemoryLogAppender = {
  val testAppender = new InMemoryLogAppender(patterns)
  val pattern = "%m"
  testAppender.setLayout(new PatternLayout(pattern))
  testAppender.setThreshold(Level.TRACE)
  testAppender.activateOptions()
  Logger.getRootLogger().addAppender(testAppender)
  testAppender
}

private class InMemoryLogAppender(patterns:Seq[String]) extends ConsoleAppender {

  var messages:ListBuffer[LogMessage] = new ListBuffer()

  override def append(event: LoggingEvent) = {
    val matchingMessage = patterns.find(event.getMessage.toString.contains(_))
    if (matchingMessage.isDefined) {
      messages += LogMessage(event.getMessage.toString, event.getLoggerName)
    }
  }

  def getMessagesText(): Seq[String] = {
    messages.map(_.message)
  }

}

case class LogMessage(message: String, loggingClass: String)

This post shown some details about distributed computation in Spark. The first section defined the 3 main components of Spark workflow: job, stage and task. Thanks to it we could learn about granularity of that depends either on number of actions or on number of partitions. The second part presented classes involved in job execution. We could see that everything is orchestrated by 2 schedulers: DAGScheduler and TaskScheduler. The last part proved some of described concepts through learning tests.