Code execution on driver and executors

Versions: Spark 2.1.0

Keeping in mind which parts of Spark code are executed on driver and which ones on workers is important and can help to avoid some of annoying errors, as the ones related to serialization.

This post describes some rules about jobs execution in Spark. The first part concerns closures that are an important piece of code in Spark workflow. They are shown through a simple processing composed of mapping and filtering transformations. The second part explains how the code is loaded to executors. The next section tries to answer to the question which methods are executed on driver and which ones on executor nodes. The last section lists some learning tests showing that.

Closures

Before talking about code shipped to executors, we need to remind an important Spark concept - closures. In Spark they're specifically called task closures since they represent code needed by executors to execute given task. Thus before going further, let's stop here and analyze what happens under-the-hood for below processing:

object Maps {
  def multiply(number: Int, factor: Int): Int = {
    number * factor
  }
}

object Filters {
  def isGreaterThan(number: Int, lowerBound: Int): Boolean = {
    number > lowerBound
  }
}

"mapping and filtering of serializable objects" should "make processing success" in {
  val data = Array(1, 2, 3, 4, 5)

  val numbersRdd = sparkContext.parallelize(data)

  val collectedNumbers = numbersRdd
    .filter(Filters.isGreaterThan(_, 0))
    .map(Maps.multiply(_, 2).toString())
    .collect()

  collectedNumbers.size shouldBe data.size
  collectedNumbers should contain only("2", "4", "6", "8", "10")
}

The code itself is not complicated. More interesting is what happens with filter and map operations, i.e. how closures are created. To see that, we'll verify logs and look for entries starting with "Cleaning closure" fragment indicating the preparation of code to execution. After launching above test, in logs we can find 5 entries creating closures:

# Filters.isGreaterThan
DEBUG +++ Cleaning closure  (com.waitingforcode.SparkExecutionTest$$anonfun$8$$anonfun$9) +++ 
DEBUG  + declared fields: 1 
DEBUG      public static final long com.waitingforcode.SparkExecutionTest$$anonfun$8$$anonfun$9.serialVersionUID DEBUG  + declared methods: 3 
DEBUG      public boolean com.waitingforcode.SparkExecutionTest$$anonfun$8$$anonfun$9.apply$mcZI$sp(int) 
DEBUG      public final boolean com.waitingforcode.SparkExecutionTest$$anonfun$8$$anonfun$9.apply(int) 
DEBUG      public final java.lang.Object com.waitingforcode.SparkExecutionTest$$anonfun$8$$anonfun$9.apply(java.lang.Object) 
DEBUG  + inner classes: 0 
DEBUG  + outer classes: 0 
DEBUG  + outer objects: 0 
DEBUG  + populating accessed fields because this is the starting closure 
DEBUG  + fields accessed by starting closure: 0 
DEBUG  + there are no enclosing objects! 

# Maps.multiply 
DEBUG +++ Cleaning closure  (com.waitingforcode.SparkExecutionTest$$anonfun$8$$anonfun$10) +++  
DEBUG  + declared fields: 1  
DEBUG      public static final long com.waitingforcode.SparkExecutionTest$$anonfun$8$$anonfun$10.serialVersionUID  
DEBUG  + declared methods: 2   
DEBUG      public final java.lang.Object com.waitingforcode.SparkExecutionTest$$anonfun$8$$anonfun$10.apply(java.lang.Object)  
DEBUG      public final java.lang.String com.waitingforcode.SparkExecutionTest$$anonfun$8$$anonfun$10.apply(int)  
DEBUG  + inner classes: 0  
DEBUG  + outer classes: 0  
DEBUG  + outer objects: 0  
DEBUG  + populating accessed fields because this is the starting closure  
DEBUG  + fields accessed by starting closure: 0  
DEBUG  + there are no enclosing objects! 

# collect - data is first collected on executors
#           and only after shipped back to driver
DEBUG +++ Cleaning closure  (org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13) +++ (org.apache.spark.util.ClosureCleaner:58)
DEBUG  + declared fields: 2
DEBUG      public static final long org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.serialVersionUID
DEBUG      private final org.apache.spark.rdd.RDD$$anonfun$collect$1 org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.$outer
DEBUG  + declared methods: 2 
DEBUG      public final java.lang.Object org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(scala.collection.Iterator)
DEBUG  + inner classes: 0
DEBUG  + outer classes: 2
DEBUG      org.apache.spark.rdd.RDD$$anonfun$collect$1
DEBUG      org.apache.spark.rdd.RDD
DEBUG  + outer objects: 2
DEBUG      
DEBUG      MapPartitionsRDD[2] at map at SparkExecutionTest.scala:77 
DEBUG  + populating accessed fields because this is the starting closure 
DEBUG  + fields accessed by starting closure: 2
DEBUG      (class org.apache.spark.rdd.RDD,Set(org$apache$spark$rdd$RDD$$evidence$1)) 
DEBUG      (class org.apache.spark.rdd.RDD$$anonfun$collect$1,Set($outer)) 
DEBUG  + outermost object is not a closure or REPL line object, so do not clone it: (class org.apache.spark.rdd.RDD,MapPartitionsRDD[2] at map at SparkExecutionTest.scala:77)
DEBUG  + cloning the object  of class org.apache.spark.rdd.RDD$$anonfun$collect$1 
DEBUG  + cleaning cloned closure  recursively (org.apache.spark.rdd.RDD$$anonfun$collect$1)
# 2 remaining entries concern collect and are omitted for brevity

One of the goals of cleaning consists on analyze code and remove all useless dependencies from parent closures that, potentially, can not be serializable. The cleaning process is important because these useless and not serializable objects can make processing fail because with Task not serializable exception. By the way, the ability to serialize cleaned closure is verified at the end of the cleaning process.

All the cleaning work is done by org.apache.spark.util.ClosureCleaner and its clean(AnyRef, Boolean, Boolean) method.

How the task is shipped to executor ?

In the post about Jobs, stages and tasks we could learn that transformations are executed in the form of tasks. Under-the-hood TaskScheduler is responsible to move serialized code to executors.

More in details, the serialized code passes to executors from org.apache.spark.scheduler.SchedulerBackend implementations. For the case of code executed locally, the used implementation is LocalSchedulerBackend. For the code working with resource managers as YARN or Mesos, the implementations are in org.apache.spark.scheduler package and one of their example is YarnSchedulerBackend. It extends CoarseGrainedSchedulerBackend that contains methods interacting with executors.

Each scheduler backend is accompanied by corresponding task scheduler. For the code executed locally, TaskSchedulerImpl is used. For the execution on cluster, for example in YARN, more specialized instance is sometimes employed (YarnScheduler). Mesos in its turn uses TaskSchedulerImpl.

Tasks are sent through TaskScheduler's submitTasks(TaskSet) that at the end calls SchedulerBackend's reviveOffers() method. It asks resource manager to retrieve executors able to handle tasks. After doing that launchTasks(Seq[Seq[TaskDescription]]) method is called. And it's here where task (TaskDescription object), built on metadata (id, assigned executor, name) and physical execution (serialized as ByteBuffer), is finally serialized and send through the network to executor. The sent object is wrapped as LaunchTask.

When serialized task is received by CoarseGrainedSchedulerBackend (resource manager mode), it's deserialized. TaskDescription object is then created. The physical task is still serialized at this moment. It's deserialized in its turn only when TaskRunner is executed. It calls Task method deserializeWithDependencies(ByteBuffer) that divides serialized object to: task files, task jars and real task bytes. The first 2 files are used by Executor's updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) method that downloads missing files from different places (cache, HTTP, Hadoop file system). The last part represents previously serialized closure with the task to execute. It's deserialized with JavaSerializer, hard-coded and created during SparkEnv initialization.

Code execution location

After discovering what (closure) and how (scheduler backend) is sent, it's time to focus on the real question of what code is executed on driver and which one on executor. To detect that some learning tests are executed. The goal of them is to use not serializable object (NotSerializableMultiplicator) in processing. Every tests asserting exceptions represent the case when the code is transfered and executed on executor:

val conf = new SparkConf().setAppName("Spark execution test").setMaster("local")
var sparkContext:SparkContext = null

before {
  sparkContext = SparkContext.getOrCreate(conf)
}

after {
  sparkContext.stop
}

"not serializable object executed on executor" should "make processing fail" in {
  val data = Array(1, 2, 3, 4, 5)

  val numbersRdd = sparkContext.parallelize(data)
  val multiplicator = new NotSerializableMultiplicator(5)


  val serializableException = intercept[SparkException] {
    numbersRdd.map(number => multiplicator.multiply(number))
  }
  serializableException.getMessage should include("Task not serializable")
  serializableException.getCause.getMessage should
    include("object not serializable (class: com.waitingforcode.NotSerializableMultiplicator")
}

"not serializable object executed in action on worker side" should "make processing fail" in {
  val data = Array(1, 2, 3, 4, 5)

  val numbersRdd = sparkContext.parallelize(data)

  val multiplicator = new NotSerializableMultiplicator(5)
  // This time we could think that multiplicator object is used on driver part,  since it manipulates
  // RDDs. But in fact it's also driver side

  // Below code is executed in action and we should think that it's done on driver's side.
  // But it's not the case - RDDs are distributed on different executors, thus
  // instead of bringing all data to driver and iterate on them, the processing
  // is applied directly on executor's side
  val serializableException = intercept[SparkException] {
    numbersRdd.foreach(number => {
      val multiplicationResult = multiplicator.multiply(number)
    })
  }
  serializableException.getMessage should include("Task not serializable")
  serializableException.getCause.getMessage should
    include("object not serializable (class: com.waitingforcode.NotSerializableMultiplicator")
}

"reduce operation" should "be executed partially on executor and on driver" in {
  val logAppender = InMemoryLogAppender.createLogAppender(Seq("Starting job", "Final stage:", "Job 0 finished"))
  val data = Array(1, 2, 3, 4, 5)

  val numbersRdd = sparkContext.parallelize(data)

  val sum = numbersRdd.reduce((n1, n2) => n1 + n2)
  sum shouldEqual 15
  // Logs contain some entries about jobs that are executed on executors (distributed manner)
  // It's enough to prove that reduce is a mixed task that intermediary results
  // are computed on executors and brought back to driver to prepare the final result
  val concatenatedLogs = logAppender.getMessagesText.mkString(",")
  concatenatedLogs.contains("Starting job: reduce at SparkExecutionTest.scala") shouldBe(true)
  concatenatedLogs.contains(",Final stage: ResultStage 0 (reduce at SparkExecutionTest.scala") shouldBe(true)
  concatenatedLogs.contains(",Job 0 finished: reduce at SparkExecutionTest.scala") shouldBe(true)
}

"collecting data" should "not make processing failing even if it uses not serializable object " in {
  val data = Array(1, 2, 3, 4, 5)

  val numbersRdd = sparkContext.parallelize(data)

  val collectedNumbers = numbersRdd
    .filter(number => number > 0)
    .collect()

  // Here data is first collected, i.e. it moves from
  // executors to driver and only on driver side the 
  // multiplication is done. So there are no need to 
  // send not serializable object
  val multiplicator = new NotSerializableMultiplicator(2)
  val result = multiplicator.multiply(collectedNumbers(0))

  result shouldBe 2
} 

class NotSerializableMultiplicator(value:Int) {

  def multiply(toMultiply:Int): Int = {
    value * toMultiply
  }

}

To simplify, we could think that all code defined in closures ({}) is executed on executor side, even if it's an action. However, some of them, are executed in both sides, like fold(T)(op: (T, T) => T) and reduce((T, T) => T) where a part of processing is executed locally by executors and only the result is aggregated on driver side. The same rule concerns less evident action as count that executes org.apache.spark.util.Utils#getIteratorSize(Iterator[T]) function as a job on all executors and, at the end, assemblies the result with sum() method of TraversableOnce trait:

 def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
 

Through this post we can learn a little about the places of code execution. The first part explained the task closures representing operations to do defined in the application. These closures are sent later to executors and the second part presented the flow of execution. It shown some new objects related to task execution, as SchedulerBackend or TaskDescription. The last part contained some learning tests used to show where transformations and actions were launched. As proven, the code defined in closures is executed on executors and sometimes also on driver (aggregation of the results generated by executors).