Failed tasks resubmit

Versions: Spark 2.1.0

A lot of things are automatized in Spark: metadata and data checkpointing, task distribution, to quote only some of them. Another one, not mentioned very often, is the automatic retry in the case of task failures.

A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free

👉 Register here

This post explains how Spark deals with failing tasks. The first part shows the case when a task working on particular data isn't able to proceed it. The second part makes an insight on what Spark does to not make whole job fail when only one of its tasks encounters temporary problem.

Task failure simulated

Simulating task failure is straightforward (certainly you've done it successfully plenty of times :P). In my case, the failing processing will be a simple map operation transforming integers to texts and failing at the 250th number. The failure is thrown as a RuntimeException:

class FailingTaskTest extends FlatSpec with Matchers with BeforeAndAfter {

  val conf = new SparkConf().setAppName("Spark failing task test").setMaster("spark://localhost:7077")
    .set("spark.task.maxFailures", "5")
    .set("spark.executor.extraClassPath", sys.props("java.class.path"))
  var sparkContext:SparkContext = null

  before {
    sparkContext = SparkContext.getOrCreate(conf)
  }

  after {
    sparkContext.stop
  }

  "whole job" should "fail after 3 failures of one particular task" in {
    val data = 1 to 500
    val inputRdd = sparkContext.parallelize(data, 3)

    inputRdd.map(number => {
      if (number == 250) {
        throw new RuntimeException("This exception is thrown to simulate task failures and lead to job failure")
      }
      s"Number#${number}"
    }).count()
  }

}

Not surprisingly, the processing fails. The message explaining the failure is related to thrown exception and to the spark.task.maxFailures configuration entry:

Job aborted due to stage failure: Task 1 in stage 0.0 failed 5 times, most recent failure: Lost task 1.4 in stage 0.0 (TID 6, 192.168.0.12, executor 0): java.lang.RuntimeException: This exception is thrown to simulate task failures and lead to job failure

Please notice that the code is executed against standalone Spark installation. In fact, if it's executed against usually used in tests "local" master, the value of max accepted failures would be overriden to 1. It's because of hardcoded val MAX_LOCAL_TASK_FAILURES = 1 used by org.apache.spark.SparkContext#createTaskScheduler(sc: SparkContext,master: String, deployMode: String):

// When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1

master match {
  case "local" =>
    val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
  // ....
  case LOCAL_N_REGEX(threads) =>
    val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)

It's possible to override this behavior by using master value of local[N,R] where R the max accepted failures for a task. But since it doesn't use configuration property (as the real applications), the idea of standalone cluster was preferred in tests.

Failed task replanning explained

Previous section introduced a property called spark.task.maxFailures. It represents the configuration of the max number of accepted task failures. "Accepted" means here that Spark will retrigger the execution of the task failed such number of times. This value concerns one particular task, e.g. if defined to 4 and two tasks failed 2 times, the failing tasks will be retriggered the 3rd time and maybe the 4th.

When the last accepted attempt defeats, the task is considered as failed and the whole job is aborted with the message shortly described in the first section. But what happens meantime, between the first and the last attempt ?

The information about the accepted number of failing tasks is stored by TaskSchedulerImpl and is read directly from the configuration. The default value of this entry is 4. Later, this value is passed to TaskSetManager instance that, at each task ended with an error, will increment the counter of failures for this given task.

When particular task fails, it's handled in org.apache.spark.scheduler.TaskSetManager#handleFailedTask(tid: Long, state: TaskState, reason: TaskFailedReason). If this failed task succeeded elsewhere (maybe because of speculative task execution), there are nothing to do except adding a log looking like:

Task ${info.id} in stage ${taskSet.id} (TID $tid) failed but 
another instance of the task has already succeeded, so not 
re-queuing the task to be re-executed.

But if the failed task didn't finish with success in other place, it's re-added to the list of pending tasks. And only when the number of failures is equal or greater to specified threshold, the task will be immediately aborted. This information goes to DAGScheduler which is charged to abort stages and job related to canceled task.

This post explained some points about handling task failures in Spark. The first part shown the code making a task fail with RuntimeException thrown at the 250th met element. The second part explained a little what happened between different attempts. As we could learn, there were no magic. Failed task was considered as it and if it didn't succeed elsewhere, it was re-added to pending tasks queue and eventually aborted later, if the number of accepted attempts was reached by Spark.


If you liked it, you should read:

đź“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!