Graceful shutdown explained

Versions: Spark 2.1.0

Spark has different methods to reduce data loss, also during streaming processing. It proposes well known checkpointing but also less obvious operation invoked on stopping processing - graceful shutdown.

New ebook 🔥

Learn 84 ways to solve common data engineering problems with cloud services.

👉 I want my Early Access edition

This post focuses on the graceful shutdown feature available in Spark Streaming. The first part explains what happens when this option is used during context stop. The second part compares the situations when the processing is executed with and without it.

Graceful shutdown in the code

Using graceful shutdown in Spark applications is quite straightforward. It's a simple boolean flag passed to org.apache.spark.streaming.StreamingContext#stop(stopSparkContext: Boolean, stopGracefully: Boolean) method. It can also be activated through spark.streaming.stopGracefullyOnShutdown configuration property.

The graceful shutdown guarantees (under some conditions, listed below in the post) that all received data is processed before destroying Spark context. The whole logic is handled by JobScheduler that stops processing by:

When graceful shutdown is enabled, JobScheduler behaves mercifully and above operations are less violent:

Graceful shutdown example

Below tests prove that when the context is stopped gracefully, remaining data is consumed:

class GracefulShutdownTest extends FlatSpec with Matchers with BeforeAndAfter {

  val NotGracefulStopKey = "notGracefulStop"
  val GracefulStopKey = "gracefulStop"

  val conf = new SparkConf().setAppName("Spark graceful shutdown test").setMaster("local[*]")
  var streamingContext: StreamingContext = null
  var wasStopped = false
  val dataQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()

  before {
    wasStopped = false
    streamingContext = new StreamingContext(conf, Durations.seconds(1))
  }

  after {
    if (!wasStopped) {
      streamingContext.stop(stopSparkContext = true, stopGracefully = true)
    }
  }

  "remaining data" should "be treated when context is stopped gracefully" in {
    consumeData(true, GracefulStopKey)
    // sleep - gives some time to accumulate remaining processed data
    Thread.sleep(6000)

    // all remaining tasks will be processed
    NumbersTracker.collectedData(GracefulStopKey) should contain theSameElementsAs(0 to 49)
  }

  "remaining data" should "not be processed when the context is not stopped gracefully" in {
    consumeData(false, NotGracefulStopKey)
    // sleep - gives some time to accumulate remaining processed data
    Thread.sleep(6000)

    // when the context is not stopped gracefully, remaining data and pending tasks
    // won't be executed, so automatically some of the last RDDs to consume
    // aren't  processed
    NumbersTracker.collectedData(NotGracefulStopKey) should contain noneOf(30, 31, 
      32, 33, 34, 35, 36, 37, 38, 39, 40, 
      41, 42, 43, 44, 45, 46, 47, 48, 49)
  }

  private def consumeData(stopGracefully: Boolean, trackedNumbersKey: String): Unit = {
    val itemsInBatch = 10
    for (index <- 0 until 5) {
      val start = itemsInBatch*index
      val data = start to start+itemsInBatch-1
      dataQueue += streamingContext.sparkContext.parallelize(data)
    }
    streamingContext.queueStream(dataQueue, oneAtATime = true)
      .foreachRDD(rdd => {
        rdd.foreach(number => {
          // after some tests, Spark's accumulator doesn't contain
          // data processed after graceful stop
          NumbersTracker.collectedData(trackedNumbersKey) += number
        })
      })

    new Thread(new Runnable() {
      override def run(): Unit = {
        Thread.sleep(3000)
        val stopSparkContext = true
        streamingContext.stop(stopSparkContext, stopGracefully)
        wasStopped = true
      }
    }).start()

    streamingContext.start()
    streamingContext.awaitTermination()
  }

}

object NumbersTracker {

  val collectedData: mutable.Map[String, mutable.ListBuffer[Int]] = 
    mutable.HashMap.empty[String, mutable.ListBuffer[Int]]

  collectedData += ("notGracefulStop" -> mutable.ListBuffer.empty[Int])
  collectedData += ("gracefulStop"  -> mutable.ListBuffer.empty[Int])

}

Graceful shutdown can improve application reliability since it guarantees execution of pending tasks and reduces data loss could be produced by the immediate context stop. As proved in the learning tests from the second section of this post, graceful shutdown processes remaining data while normal shutdown doesn't. However, the graceful shutdown is not granted in all cases. For example, it's a little bit tricky to use in Spark jobs managed by YARN.