Metadata checkpoint

One of previous posts talked about checkpoint types in Spark Streaming. This one focuses more on one type of them - metadata checkpoint.

The first part of this post explains steps making that given context is checkpointed. The second part focuses more on implementation code and shows what information and how is stored. The last part tries to answer to the question related to data persisted during metadata checkpoint.

Checkpoint writing workflow

Metadata checkpoint is triggered by 2 events: job generation and batch completion. Every time when one of them occurs, a new DoCheckpoint event is sent to abstract structure called EventLoop and processed in order of reception.

Checkpoint is written under 2 conditions:

If both conditions are met, checkpoint is written by CheckpointWriter instance. An interesting point about it concerns the control of concurrent writes. To create checkpoint physically, CheckpointWriter uses a thread pool composed only of 1 thread. So if checkpoint requests accumulate, only 1 request will be processed at given time and the others will be queued for further processing. However, too many accumulated requests mean some problems, as too slow checkpointing process regarding to batch processing. It can lead to the situations when the last checkpointed file doesn't really represent the last state.

Checkpoint writing

As already mentioned in the post about Spark Streaming checkpointing and Write Ahead Logs, metadata checkpoint helps to recover streaming processing in the case of driver's failure. It's possible thanks to parameters that are checkpointed. More precisely to see what happens, let's take a quick look at org.apache.spark.streaming.Checkpoint class:

class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
  extends Logging with Serializable {
  val master = ssc.sc.master
  val framework = ssc.sc.appName
  val jars = ssc.sc.jars
  val graph = ssc.graph
  val checkpointDir = ssc.checkpointDir
  val checkpointDuration = ssc.checkpointDuration
  val pendingTimes = ssc.scheduler.getPendingTimes().toArray
  val sparkConfPairs = ssc.conf.getAll

As you can deduce, checkpointed data is defined as immutable fields, such as: configuration, jars, DStream graph or pending (not completed) jobs (starting and terminating jobs with JobScheduler .

Starting and terminating jobs with JobScheduler

org.apache.spark.streaming.scheduler.JobScheduler keeps all submitted jobs in ConcurrentHashMap field called jobSets. Every time a job starts, handleJobStart(job: Job, startTime: Long) method is invoked and it changes job's start time property.
Job termination is handled in handleJobCompletion(job: Job, completedTime: Long) method where completion time is defined, some stats (delay, processing delay) are computed and, finally, executed job is removed from jobSets map.

Checkpointing is driven by org.apache.spark.streaming.scheduler.JobGenerator that defines first if metadata checkpoint must be generated. If it does, JobGenerator initializes the instance of already mentioned CheckpointWriter. If we investigate checkpoint directory, its files can look like in output of below tree command:

.
β”œβ”€β”€ 93eb4271-c967-484d-bfd5-453b874e017c
β”œβ”€β”€ b45b335a-c709-4b1d-8fc6-8a418c3d5feb
β”œβ”€β”€ checkpoint-1489552062250
β”œβ”€β”€ checkpoint-1489552062250.bk
β”œβ”€β”€ checkpoint-1489552062500
β”œβ”€β”€ checkpoint-1489552062500.bk
β”œβ”€β”€ checkpoint-1489552062750
β”œβ”€β”€ checkpoint-1489552062750.bk
└── receivedBlockMetadata
    └── log-1489552062665-1489552122665

All checkpoints are named according to checkpoint-creationTimeMs pattern. Files with .bk extension are checkpoint "backup" files. But in fact, it's not a security backup in case of file loss. This file is generated to prevent checkpoint race conditions, i.e. checkpoints done at the same moment, as it's shown in below code:

if (fs.exists(checkpointFile)) {
  fs.delete(backupFile, true) // just in case it exists
  if (!fs.rename(checkpointFile, backupFile)) {
    logWarning(s"Could not rename $checkpointFile to $backupFile")
  }
}

Does metadata checkpoint store RDD ?

Metadata checkpoint doesn't store RDDs. But it's not obvious after reading Spark Streaming source code. At first glance, it seems that checkpointed DStreamGraph restores some checkpointed data through restoreCheckpointData() method. But in fact it does it only when data checkpoint is enabled. It can be easily verified with enabling and disabling calls to DStream's checkpoint(Duration) method, as in below snippet:

JavaReceiverInputDStream<String> receiverInputDStream =
  streamingContext.receiverStream(new DStreamTest.AutoDataMakingReceiver(StorageLevel.MEMORY_ONLY(), 500L, 2));
// comment/uncomment to make tests
receiverInputDStream.checkpoint(new Duration(500L));
receiverInputDStream.foreachRDD(rdd -> {
    System.out.println("Reading "+rdd.collect());
});

After analyzing the logs we can easily find that methods restoring streaming don't return the same things in both cases - "Restoring checkpointed RDD" is missing for the 2nd point:

And then, what happens with data to process if job stops unexpectedly when metadata checkpoint is turned on ? It's lost, at least if you don't activate WAL that was presented shortly in the post about Spark Streaming checkpointing and Write Ahead Logs.

So, what data is stored in metadata checkpoint ? After all, Checkpoint class has a field declared as val graph = ssc.graph. In fact, it represents input and output DStreams. But rather than data, they store computational graph - a little bit as Directed Acyclic Graph in Spark.

This post presents metadata checkpoint in Spark Streaming. The first part shows the conditions of creating it. The second section explains how it's written. The last part proves that metadata checkpoint without data checkpoint activated, doesn't store data (RDD) but only computational graph.


If you liked it, you should read:

πŸ“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!