Spark has different methods to reduce data loss, also during streaming processing. It proposes well known checkpointing but also less obvious operation invoked on stopping processing - graceful shutdown.

Looking for a better data engineering position and skills?
You have been working as a data engineer but feel stuck? You don't have any new challenges and are still writing the same jobs all over again? You have now different options. You can try to look for a new job, now or later, or learn from the others! "Become a Better Data Engineer" initiative is one of these places where you can find online learning resources where the theory meets the practice. They will help you prepare maybe for the next job, or at least, improve your current skillset without looking for something else.
๐ I'm interested in improving my data engineering skillset
This post focuses on the graceful shutdown feature available in Spark Streaming. The first part explains what happens when this option is used during context stop. The second part compares the situations when the processing is executed with and without it.
Graceful shutdown in the code
Using graceful shutdown in Spark applications is quite straightforward. It's a simple boolean flag passed to org.apache.spark.streaming.StreamingContext#stop(stopSparkContext: Boolean, stopGracefully: Boolean) method. It can also be activated through spark.streaming.stopGracefullyOnShutdown configuration property.
The graceful shutdown guarantees (under some conditions, listed below in the post) that all received data is processed before destroying Spark context. The whole logic is handled by JobScheduler that stops processing by:
- stopping receiving data
- stopping executors allocators (if dynamic allocation enabled)
- stopping job generation
- stopping current jobs execution
- stopping listeners for job events
When graceful shutdown is enabled, JobScheduler behaves mercifully and above operations are less violent:
- stopping receiving data - graceful shutdown waits that all expected data is physically received by receivers.
- stopping executors allocators (if dynamic allocation enabled) - nothing changes for this point, they're stopped immediately. It means that executors allocation won't change for remaining jobs.
- stopping job generation - graceful shutdown allows the generation of the jobs for current ongoing time interval. It means that all received blocks of data must be allocated and processed for that period. But the graceful shutdown doesn't guarantee all planned jobs execution. In fact everything it's constrained by a timeout. Because of it, the graceful shutdown of job generation must finish before the time specified in spark.streaming.gracefulStopTimeout (by default 10*batch interval). Once this time elapsed, the job generation is stopped - even if some steps are incomplete.
- stopping current jobs execution - graceful shutdown gives some time to already planned jobs to be physically executed. This period of time is equal to 1 hour. So even here, if several jobs take more than that to complete, the graceful shutdown won't guarantee the execution of all planned jobs.
- stopping listeners for job events - no changes here.
Graceful shutdown example
Below tests prove that when the context is stopped gracefully, remaining data is consumed:
class GracefulShutdownTest extends FlatSpec with Matchers with BeforeAndAfter { val NotGracefulStopKey = "notGracefulStop" val GracefulStopKey = "gracefulStop" val conf = new SparkConf().setAppName("Spark graceful shutdown test").setMaster("local[*]") var streamingContext: StreamingContext = null var wasStopped = false val dataQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]() before { wasStopped = false streamingContext = new StreamingContext(conf, Durations.seconds(1)) } after { if (!wasStopped) { streamingContext.stop(stopSparkContext = true, stopGracefully = true) } } "remaining data" should "be treated when context is stopped gracefully" in { consumeData(true, GracefulStopKey) // sleep - gives some time to accumulate remaining processed data Thread.sleep(6000) // all remaining tasks will be processed NumbersTracker.collectedData(GracefulStopKey) should contain theSameElementsAs(0 to 49) } "remaining data" should "not be processed when the context is not stopped gracefully" in { consumeData(false, NotGracefulStopKey) // sleep - gives some time to accumulate remaining processed data Thread.sleep(6000) // when the context is not stopped gracefully, remaining data and pending tasks // won't be executed, so automatically some of the last RDDs to consume // aren't processed NumbersTracker.collectedData(NotGracefulStopKey) should contain noneOf(30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49) } private def consumeData(stopGracefully: Boolean, trackedNumbersKey: String): Unit = { val itemsInBatch = 10 for (index <- 0 until 5) { val start = itemsInBatch*index val data = start to start+itemsInBatch-1 dataQueue += streamingContext.sparkContext.parallelize(data) } streamingContext.queueStream(dataQueue, oneAtATime = true) .foreachRDD(rdd => { rdd.foreach(number => { // after some tests, Spark's accumulator doesn't contain // data processed after graceful stop NumbersTracker.collectedData(trackedNumbersKey) += number }) }) new Thread(new Runnable() { override def run(): Unit = { Thread.sleep(3000) val stopSparkContext = true streamingContext.stop(stopSparkContext, stopGracefully) wasStopped = true } }).start() streamingContext.start() streamingContext.awaitTermination() } } object NumbersTracker { val collectedData: mutable.Map[String, mutable.ListBuffer[Int]] = mutable.HashMap.empty[String, mutable.ListBuffer[Int]] collectedData += ("notGracefulStop" -> mutable.ListBuffer.empty[Int]) collectedData += ("gracefulStop" -> mutable.ListBuffer.empty[Int]) }
Graceful shutdown can improve application reliability since it guarantees execution of pending tasks and reduces data loss could be produced by the immediate context stop. As proved in the learning tests from the second section of this post, graceful shutdown processes remaining data while normal shutdown doesn't. However, the graceful shutdown is not granted in all cases. For example, it's a little bit tricky to use in Spark jobs managed by YARN.