Graceful shutdown explained

on waitingforcode.com

Graceful shutdown explained

Spark has different methods to reduce data loss, also during streaming processing. It proposes well known checkpointing but also less obvious operation invoked on stopping processing - graceful shutdown.

This post focuses on the graceful shutdown feature available in Spark Streaming. The first part explains what happens when this option is used during context stop. The second part compares the situations when the processing is executed with and without it.

Graceful shutdown in the code

Using graceful shutdown in Spark applications is quite straightforward. It's a simple boolean flag passed to org.apache.spark.streaming.StreamingContext#stop(stopSparkContext: Boolean, stopGracefully: Boolean) method. It can also be activated through spark.streaming.stopGracefullyOnShutdown configuration property.

The graceful shutdown guarantees (under some conditions, listed below in the post) that all received data is processed before destroying Spark context. The whole logic is handled by JobScheduler that stops processing by:

  • stopping receiving data
  • stopping executors allocators (if dynamic allocation enabled)
  • stopping job generation
  • stopping current jobs execution
  • stopping listeners for job events

When graceful shutdown is enabled, JobScheduler behaves mercifully and above operations are less violent:

  • stopping receiving data - graceful shutdown waits that all expected data is physically received by receivers.
  • stopping executors allocators (if dynamic allocation enabled) - nothing changes for this point, they're stopped immediately. It means that executors allocation won't change for remaining jobs.
  • stopping job generation - graceful shutdown allows the generation of the jobs for current ongoing time interval. It means that all received blocks of data must be allocated and processed for that period. But the graceful shutdown doesn't guarantee all planned jobs execution. In fact everything it's constrained by a timeout. Because of it, the graceful shutdown of job generation must finish before the time specified in spark.streaming.gracefulStopTimeout (by default 10*batch interval). Once this time elapsed, the job generation is stopped - even if some steps are incomplete.
  • stopping current jobs execution - graceful shutdown gives some time to already planned jobs to be physically executed. This period of time is equal to 1 hour. So even here, if several jobs take more than that to complete, the graceful shutdown won't guarantee the execution of all planned jobs.
  • stopping listeners for job events - no changes here.

Graceful shutdown example

Below tests prove that when the context is stopped gracefully, remaining data is consumed:

class GracefulShutdownTest extends FlatSpec with Matchers with BeforeAndAfter {

  val NotGracefulStopKey = "notGracefulStop"
  val GracefulStopKey = "gracefulStop"

  val conf = new SparkConf().setAppName("Spark graceful shutdown test").setMaster("local[*]")
  var streamingContext: StreamingContext = null
  var wasStopped = false
  val dataQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()

  before {
    wasStopped = false
    streamingContext = new StreamingContext(conf, Durations.seconds(1))
  }

  after {
    if (!wasStopped) {
      streamingContext.stop(stopSparkContext = true, stopGracefully = true)
    }
  }

  "remaining data" should "be treated when context is stopped gracefully" in {
    consumeData(true, GracefulStopKey)
    // sleep - gives some time to accumulate remaining processed data
    Thread.sleep(6000)

    // all remaining tasks will be processed
    NumbersTracker.collectedData(GracefulStopKey) should contain theSameElementsAs(0 to 49)
  }

  "remaining data" should "not be processed when the context is not stopped gracefully" in {
    consumeData(false, NotGracefulStopKey)
    // sleep - gives some time to accumulate remaining processed data
    Thread.sleep(6000)

    // when the context is not stopped gracefully, remaining data and pending tasks
    // won't be executed, so automatically some of the last RDDs to consume
    // aren't  processed
    NumbersTracker.collectedData(NotGracefulStopKey) should contain noneOf(30, 31, 
      32, 33, 34, 35, 36, 37, 38, 39, 40, 
      41, 42, 43, 44, 45, 46, 47, 48, 49)
  }

  private def consumeData(stopGracefully: Boolean, trackedNumbersKey: String): Unit = {
    val itemsInBatch = 10
    for (index <- 0 until 5) {
      val start = itemsInBatch*index
      val data = start to start+itemsInBatch-1
      dataQueue += streamingContext.sparkContext.parallelize(data)
    }
    streamingContext.queueStream(dataQueue, oneAtATime = true)
      .foreachRDD(rdd => {
        rdd.foreach(number => {
          // after some tests, Spark's accumulator doesn't contain
          // data processed after graceful stop
          NumbersTracker.collectedData(trackedNumbersKey) += number
        })
      })

    new Thread(new Runnable() {
      override def run(): Unit = {
        Thread.sleep(3000)
        val stopSparkContext = true
        streamingContext.stop(stopSparkContext, stopGracefully)
        wasStopped = true
      }
    }).start()

    streamingContext.start()
    streamingContext.awaitTermination()
  }

}

object NumbersTracker {

  val collectedData: mutable.Map[String, mutable.ListBuffer[Int]] = 
    mutable.HashMap.empty[String, mutable.ListBuffer[Int]]

  collectedData += ("notGracefulStop" -> mutable.ListBuffer.empty[Int])
  collectedData += ("gracefulStop"  -> mutable.ListBuffer.empty[Int])

}

Graceful shutdown can improve application reliability since it guarantees execution of pending tasks and reduces data loss could be produced by the immediate context stop. As proved in the learning tests from the second section of this post, graceful shutdown processes remaining data while normal shutdown doesn't. However, the graceful shutdown is not granted in all cases. For example, it's a little bit tricky to use in Spark jobs managed by YARN.

Read also about Graceful shutdown explained here: How to Shutdown a Spark Streaming Job Gracefully , Spark Streaming configuration .

Share, like or comment this post on Twitter:

Share on: