Spark data locality

Versions: Spark 2.2.0

If you've ever analyzed Spark UI, you've certainly seen the part of Locality level in the table with tasks. Even if this concept is less exposed than the topics as shuffle, it remains quite important in efficient data processing.

In this post we'll try to explain what the data locality is. The first part introduces the concept with several basic details. The next part shows how this feature is implemented in the code. The last section presents some tests with changing locality level depending on the execution context.

Data locality definition

In order to optimize processing tasks, Spark tries to place the execution code as close as possible to the processed data. This is called data locality. In general manner, Spark will firstly try to move serialized code to the data because the code is usually smaller in size and moving it is much cheaper than transferring the data over the network. However, it's not always possible and sometimes the data must be moved to the executor - for instance when given node is responsible only for storage.

Spark describes data locality concept in 5 levels:

The data locality is controlled by the configuration entries beginning with spark.locality.wait. Its value is defined in time units (e.g. s for seconds) and indicates how long Spark can wait to acquire the data on given locality level before giving up. Sometimes the data is not available immediately and the processing task must wait before getting it. However, when the time defined in spark.locality.wait expires, Spark will try less local level, that is: local -> node -> rack -> any. The configuration enhances more fine-grained control with the entries reserved for: spark.locality.wait.process (PROCESS_LOCAL), spark.locality.wait.node (NODE_LOCAL) and spark.locality.wait.rack (RACK_LOCAL).

The configuration paragraph shows that the data locality can have an important influence on processing time. For instance, when the data is available outside of the rack and the waiting time is defined for 3 seconds, we'll need to wait 9 seconds before executing given task. If the processing works on micro-batches it'll inevitably delay the task execution.

Data locality under the hood

After this introduction it's time to move to the internal implementation analysis. It'd help to understand when the data locality is resolved and if it can be driven by additional information.

org.apache.spark.scheduler.TaskLocation is the interface representing the task location. Among available implementations we can distinguish: ExecutorCacheTaskLocation, HostTaskLocation and HDFSCacheTaskLocation. All these 3 objects are generated by the particular implementations of org.apache.spark.rdd.RDD#getPreferredLocations. For instance, in the case of HDFS files the resolution method uses the information about the nodes storing given input split. For the other case, the RDD representing shuffle result, the preferred locations will be the nodes having the most outputs for given partition.

Once the task location resolved, it's associated to tasks. Later Spark utilizes this preference information to place each of tasks in an adequate hashmap in TaskSetManager: pendingTasksForExecutor, pendingTasksForHost, pendingTasksForRack and pendingTasksWithNoPrefs. Each of maps corresponds to different locality level: pendingTasksForExecutor=PROCESS_LOCAL, pendingTasksForHost=NODE_LOCAL, pendingTasksForRack=RACK_LOCAL and pendingTasksWithNoPrefs=NO_PREF. One task can have several different locality levels.

Resolved locality levels are later used in org.apache.spark.scheduler.TaskSetManager#resourceOffer(execId: String, host: String, maxLocality: TaskLocality.TaskLocality). The information about current locality level (if a task has more than 1 level) is kept by TaskSetManager's currentLocalityIndex field. It's changed every time when the waiting time defined in Spark configuration (3s by default) expires. And this change is made in org.apache.spark.scheduler.TaskSetManager#getAllowedLocalityLevel(curTime: Long) method.

Spark data locality example

It's not obvious to show data locality through learning tests. It's why we're focus here on showing how the locality is resolved from defined preferred locations by catching adequate messages in the log:

"node local levels" should "be resolved from driver host" in {
  val logAppender = InMemoryLogAppender.createLogAppender(Seq("Valid locality levels for TaskSet",
    "No tasks for locality level"))
  val preferredLocations = Seq("localhost", "executor_0.0.0.1.external_driver")
  new PreferenceAwareRDD(sparkContext, Seq.empty, preferredLocations)
    .map(url => (url, System.currentTimeMillis()))
    .foreach(pair => {
      println(s"Fetched ${pair._1} at ${pair._2}")
    })

  logAppender.getMessagesText() should contain allOf("Valid locality levels for TaskSet 0.0: PROCESS_LOCAL, NODE_LOCAL, ANY",
    "No tasks for locality level PROCESS_LOCAL, so moving to locality level NODE_LOCAL",
    "No tasks for locality level NODE_LOCAL, so moving to locality level ANY")
  println(s"caught messages ${logAppender.getMessagesText().mkString("\n")}")
}

"any levels" should "only resolved when no preferred host is defined" in {
  val logAppender = InMemoryLogAppender.createLogAppender(Seq("Valid locality levels for TaskSet",
    "No tasks for locality level"))
  new PreferenceAwareRDD(sparkContext, Seq.empty, Seq.empty)
    .map(url => (url, System.currentTimeMillis()))
    .foreach(pair => {
      println(s"Fetched ${pair._1} at ${pair._2}")
    })

  logAppender.getMessagesText() should contain allOf("Valid locality levels for TaskSet 0.0: NO_PREF, ANY",
    "No tasks for locality level NO_PREF, so moving to locality level ANY")
}

class PreferenceAwareRDD(sparkContext: SparkContext, deps:Seq[Dependency[String]], preferredLocations: Seq[String])
  extends RDD[String](sparkContext, deps) {

  @DeveloperApi
  override def compute(partition: Partition, context: TaskContext): Iterator[String] = {
    Seq(s"test${System.currentTimeMillis()}").iterator
  }

  override protected def getPartitions: Array[Partition] = {
    val partition: Partition = DefaultPartition()
    Array(partition)
  }

  override protected def getPreferredLocations(split: Partition): Seq[String] = preferredLocations

  case class DefaultPartition() extends Partition {
    override def index: Int = 0
  }
}

Data locality can speed up or slow down the processing time. If the data is located close to the processing code, there are no need (or the need is small) to move the data through the network. In the other side, when the data is located far away from the processing code, the different locality levels can be tried before reaching the values to process. And if it tries multiple times, each time waiting 3 seconds, it can negatively influence the performances.