Spark failure detection - heartbeats

Versions: Spark 2.2.0

One of problems in distributed computing is the failure detection. How a master node can know that some of its workers went down just a minute ? A popular and quite simple solution uses heartbeats sent at regular interval by the workers. Spark also implements this technique.

This post describes the heartbeats mechanism implemented in Spark. The first short section describes the general idea of this technique. The second and the third parts show how it's used by Spark executors. The last section demonstrates what happens with the jobs if one of executors slows down and doesn't send the heartbeat within configured interval.

Heartbeats

As already mentioned, the heartbeat is a signal sent periodically in order to indicate normal operation of the node or synchronize with other parts of the system. If the signal is not sent within configured delay, the node is considered as down. However sometimes it can be a dangerous supposition, especially with network latency.

Heartbeats is widely used in distributed systems. Except Spark, among heartbeat users we can distinguish:

Heartbeat message in Spark

In Spark the heartbeats are the messages sent by executors to the driver. The message is represented by case class org.apache.spark.Heartbeat and it contains: executor id, the metrics about tasks running in the executor (run, GC, CPU time, result size etc.) and the executor's block manager id.

The message is then received by the driver through org.apache.spark.HeartbeatReceiver#receiveAndReply(context: RpcCallContext) method. The driver:

After handling the heartbeat, the driver prepares a response that contains only 1 information - a boolean flag telling executor if it must register its block manager with all already generated blocks.

Heartbeat timeout

But as told earlier, the main purpose of heartbeats consists on checking if given node is still alive. The driver verifies it at fixed interval (defined in spark.network.timeoutInterval entry) by sending ExpireDeadHosts message to itself. When the message is handled, the driver checks for the executors with no recent heartbeats.

An executor is considered as dead if, at the time of checking, its last heartbeat message is older than the timeout value specified in spark.network.timeout entry. On removal, the driver informs task scheduler about executor lost. Later the scheduler handles the lost of tasks executing on the executor. The driver tells the same information to DAG scheduler that removes all traces (as shuffle blocks) representing the lost executor. Moreover, the driver also asks SparkContext to replace lost executor through used cluster manager. However, this operation doesn't guarantee the appearing of new executor because it can be stolen by other application.

The executors send the heartbeat message at fixed interval defined in spark.executor.heartbeatInterval configuration property. As you can logically deduce, this value should be smaller than the one specified in spark.network.timeout. As shown in the test "the job" should "never start if the heartbeat interval is greater than the network timeout", the job will never start with this incorrect configuration.

Sometimes the messages sent by the executor can not be delivered to the driver. In this case, the executor increments internal counter with failed deliveries. On each failure the executor tries to send the heartbeat once again. It stops the tries when the counter reaches the value defined in spark.executor.heartbeat.maxFailures. At this moment it considers something wrong happens with the driver and kills itself.

Spark heartbeats demo

First, let's see what happens if the heartbeat interval is much longer than the network timeout:

val conf = new SparkConf().setAppName("Spark failing task test").setMaster("spark://localhost:7077")
  .set("spark.executor.heartbeatInterval", "5s")
  .set("spark.network.timeoutInterval", "1s")
  .set("spark.network.timeout", "1s")
  .set("spark.executor.extraClassPath", sys.props("java.class.path"))
val sparkContext:SparkContext = SparkContext.getOrCreate(conf)

after {
  sparkContext.stop
}

"the job" should "never start if the heartbeat interval is greater than the network timeout" in {
  val data = 1 to 5
  val inputRdd = sparkContext.parallelize(data)

  val error = intercept[Exception] {
    inputRdd.map(number => {
      s"Number#${number}"
    }).count()
  }

  error.getMessage should include("Executor heartbeat timed out after")
}

As expected, the job doesn't start. In order to see what happens if executors go down during task execution, we'll need some manual help. First, the used code:

val conf = new SparkConf().setAppName("Spark failing task test").setMaster("spark://localhost:7077")
  .set("spark.executor.heartbeatInterval", "1s")
  .set("spark.network.timeoutInterval", "5s")
  .set("spark.network.timeout", "5s")
  .set("spark.executor.extraClassPath", sys.props("java.class.path"))
val sparkContext:SparkContext = SparkContext.getOrCreate(conf)

after {
  sparkContext.stop
}

"the job" should "fail when executors go down" in {
  val data = 1 to 500000
  val inputRdd = sparkContext.parallelize(data, 300)

  println("Before processing")
  inputRdd.map(number => {
    // just to avoid the task to finish before executors shutdown
    Thread.sleep(1000)
    s"Number#${number}"
  }).count()
}

Now let's start standalone cluster with more than 1 worker node (3 in my case):

export SPARK_WORKER_INSTANCES=3
export SPARK_WORKER_MEMORY=2500M
sbin/start-master.sh -h localhost
sbin/start-slave.sh  spark://localhost:7077

Next, let's see what happens with the job on the below movie (with the help of a little bit of bad magic):

As you can see through this short video, the executor are lost because of breakpoints added in HeartbeatReceiver. The breakpoint blocks the execution during some 30 seconds. It's much more than the network timeout. After unlocking the breakpoint, the executors are considered as dead. We can see it in Spark UI where 12 tasks are marked as failed. But we can also see that new executors was demanded in place of lost ones and that the failed tasks were replanned on them.

The heartbeats help to keep the state between the nodes in distributed systems. In Spark the role of these signals is globally the same. They help the driver to know which executors are still alive. However as we saw through this post, the heartbeats are also used to sent tasks metrics to the driver or to inform executor about the necessity of registering its block manager. Also, as we saw in the last section, when an executor is lost, Spark asks cluster manager for substitutes.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!