One of problems in distributed computing is the failure detection. How a master node can know that some of its workers went down just a minute ? A popular and quite simple solution uses heartbeats sent at regular interval by the workers. Spark also implements this technique.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
This post describes the heartbeats mechanism implemented in Spark. The first short section describes the general idea of this technique. The second and the third parts show how it's used by Spark executors. The last section demonstrates what happens with the jobs if one of executors slows down and doesn't send the heartbeat within configured interval.
Heartbeats
As already mentioned, the heartbeat is a signal sent periodically in order to indicate normal operation of the node or synchronize with other parts of the system. If the signal is not sent within configured delay, the node is considered as down. However sometimes it can be a dangerous supposition, especially with network latency.
Heartbeats is widely used in distributed systems. Except Spark, among heartbeat users we can distinguish:
- Apache ZooKeeper - mostly used in sessions, you can read Sessions in Apache ZooKeeper for more details
- Apache Kafka - consumers send heartbeats to coordinator. They're used to keep a list of active consumer in a group and facilitate the rebalancing if a new consumer joins the group. You can learn more details from the post about Coordinator in Apache Kafka.
Heartbeat message in Spark
In Spark the heartbeats are the messages sent by executors to the driver. The message is represented by case class org.apache.spark.Heartbeat and it contains: executor id, the metrics about tasks running in the executor (run, GC, CPU time, result size etc.) and the executor's block manager id.
The message is then received by the driver through org.apache.spark.HeartbeatReceiver#receiveAndReply(context: RpcCallContext) method. The driver:
- updates the last seen time of given executor
- checks if it knows about executor's block manager
- updates tasks metrics
After handling the heartbeat, the driver prepares a response that contains only 1 information - a boolean flag telling executor if it must register its block manager with all already generated blocks.
Heartbeat timeout
But as told earlier, the main purpose of heartbeats consists on checking if given node is still alive. The driver verifies it at fixed interval (defined in spark.network.timeoutInterval entry) by sending ExpireDeadHosts message to itself. When the message is handled, the driver checks for the executors with no recent heartbeats.
An executor is considered as dead if, at the time of checking, its last heartbeat message is older than the timeout value specified in spark.network.timeout entry. On removal, the driver informs task scheduler about executor lost. Later the scheduler handles the lost of tasks executing on the executor. The driver tells the same information to DAG scheduler that removes all traces (as shuffle blocks) representing the lost executor. Moreover, the driver also asks SparkContext to replace lost executor through used cluster manager. However, this operation doesn't guarantee the appearing of new executor because it can be stolen by other application.
The executors send the heartbeat message at fixed interval defined in spark.executor.heartbeatInterval configuration property. As you can logically deduce, this value should be smaller than the one specified in spark.network.timeout. As shown in the test "the job" should "never start if the heartbeat interval is greater than the network timeout", the job will never start with this incorrect configuration.
Sometimes the messages sent by the executor can not be delivered to the driver. In this case, the executor increments internal counter with failed deliveries. On each failure the executor tries to send the heartbeat once again. It stops the tries when the counter reaches the value defined in spark.executor.heartbeat.maxFailures. At this moment it considers something wrong happens with the driver and kills itself.
Spark heartbeats demo
First, let's see what happens if the heartbeat interval is much longer than the network timeout:
val conf = new SparkConf().setAppName("Spark failing task test").setMaster("spark://localhost:7077") .set("spark.executor.heartbeatInterval", "5s") .set("spark.network.timeoutInterval", "1s") .set("spark.network.timeout", "1s") .set("spark.executor.extraClassPath", sys.props("java.class.path")) val sparkContext:SparkContext = SparkContext.getOrCreate(conf) after { sparkContext.stop } "the job" should "never start if the heartbeat interval is greater than the network timeout" in { val data = 1 to 5 val inputRdd = sparkContext.parallelize(data) val error = intercept[Exception] { inputRdd.map(number => { s"Number#${number}" }).count() } error.getMessage should include("Executor heartbeat timed out after") }
As expected, the job doesn't start. In order to see what happens if executors go down during task execution, we'll need some manual help. First, the used code:
val conf = new SparkConf().setAppName("Spark failing task test").setMaster("spark://localhost:7077") .set("spark.executor.heartbeatInterval", "1s") .set("spark.network.timeoutInterval", "5s") .set("spark.network.timeout", "5s") .set("spark.executor.extraClassPath", sys.props("java.class.path")) val sparkContext:SparkContext = SparkContext.getOrCreate(conf) after { sparkContext.stop } "the job" should "fail when executors go down" in { val data = 1 to 500000 val inputRdd = sparkContext.parallelize(data, 300) println("Before processing") inputRdd.map(number => { // just to avoid the task to finish before executors shutdown Thread.sleep(1000) s"Number#${number}" }).count() }
Now let's start standalone cluster with more than 1 worker node (3 in my case):
export SPARK_WORKER_INSTANCES=3 export SPARK_WORKER_MEMORY=2500M sbin/start-master.sh -h localhost sbin/start-slave.sh spark://localhost:7077
Next, let's see what happens with the job on the below movie (with the help of a little bit of bad magic):
As you can see through this short video, the executor are lost because of breakpoints added in HeartbeatReceiver. The breakpoint blocks the execution during some 30 seconds. It's much more than the network timeout. After unlocking the breakpoint, the executors are considered as dead. We can see it in Spark UI where 12 tasks are marked as failed. But we can also see that new executors was demanded in place of lost ones and that the failed tasks were replanned on them.
The heartbeats help to keep the state between the nodes in distributed systems. In Spark the role of these signals is globally the same. They help the driver to know which executors are still alive. However as we saw through this post, the heartbeats are also used to sent tasks metrics to the driver or to inform executor about the necessity of registering its block manager. Also, as we saw in the last section, when an executor is lost, Spark asks cluster manager for substitutes.