Checkpointing in Spark

on waitingforcode.com

Checkpointing in Spark

Checkpointing is, alongside caching, a method allowing to make a RDD persist. But there are some subtle differences between cache and checkpoint.

In the first part we'll learn some basic information about checkpointing. After, flows for writing and reading RDDs will be explained. Finally, some test cases will be written to show how to use checkpoints through Spark's Java API.

What is a checkpoint in Spark ?

Checkpointing is a process consisting on storing permanently (filesystem) or not (memory) a RDD without its dependencies. It means that only checkpointed RDD is saved. Thus checkpoints are useful to save RDD which computation time is long, for example because of the number of parent RDDs.

Two types of checkpoints exist: reliable and local. Reliable checkpoint ensures that data is stored permanently, ideally on HDFS or other distributed filesystem. It means also that data is often replicated across the network which can slow down checkpoint operation. In the other side, local checkpoint privileges performance over fault-tolerance. RDD is persisted to ephemeral local storage in the executors. In consequence, data is written quicker but in the case of failures, job fails irrecoverably. Under-the-hood local checkpointing uses the same mechanism as cache. If checkpointed RDD is already cached with specific storage, local checkpoint will use the same method (cache). The difference is that checkpoint adds disk storage to cache method - it passes from MEMORY level to MEMORY_AND_DISK. If checkpointed RDD is not in cache, the default storage is used (MEMORY_AND_DISK level). The reason of this behavior is explained in LocalRDDCheckpointData.scala file, as a comment for method resolving storage level:

 /**
  * Transform the specified storage level to one that uses disk.
  *
  * This guarantees that the RDD can be recomputed multiple times correctly as long as
  * executors do not fail. Otherwise, if the RDD is cached in memory only, for instance,
  * the checkpoint data will be lost if the relevant block is evicted from memory.
  *
  * This method is idempotent.
  */
def transformStorageLevel(level: StorageLevel): StorageLevel = {
  StorageLevel(useDisk = true, level.useMemory, level.deserialized, level.replication)
}

Unlike cache, checkpoints don't store DAG structure with all parent RDDs of saved data. Instead, they save only particular RDD. This RDD takes form of checkpointed RDD. So, the advantage of using checkpoints over cache is equal to prefer performance against reliability. In additional, checkpointed data can remain for a long time while cache, strictly related to context of executed computation, ends with the end of Spark application.

Saving and reading a checkpoint

Let's investigate now how checkpoint operation is done. For the writing, RDD's method chekpoint() is called. It marks given RDD as "to be checkpointed". After the first action on saved RDD is made, physical checkpoint is executed. Checkpointed RDD is saved as the instance of ReliableCheckpointRDD.

Two files are saved for each partition: one containing data and one hidden file being a checksum. Both are prefixed with part- (.part- for checksum one to be more precise). Sample tree looks similarly to:

.
├── 5c5a83b1-f0fd-488e-9aea-47da595725d5
│   └── rdd-3
│       ├── part-00000
│       ├── .part-00000.crc
│       ├── part-00001
│       └── .part-00001.crc

We can also configure writing to make some cleaning. If spark.cleaner.referenceTracking.cleanCheckpoints property is set to true, Spark will mark old checkpoints as "to be removed". However, it applies only to reliable RDD.

Checkpoints reading is made through Spark context's checkpointFile(String) method. If the file exists, the RDD is returned as the instance of ReliableCheckpointRDD. As told in the previous section, this RDD doesn't contain any dependencies.

Spark checkpoint example

Now we can see some test cases showing the use of checkpoints through Spark's Java API:

private static final SparkConf CONFIGURATION =
    new SparkConf().setAppName("Checkpoint Test").setMaster("local[1]");
private static JavaSparkContext context;

private static final String CHECKPOINT_DIR = "/home/konieczny/tmp/checkpoint";

@BeforeClass
public static void init() {
  context = new JavaSparkContext(CONFIGURATION);
  context.setCheckpointDir(CHECKPOINT_DIR);
}

@Test
public void should_correctly_checkpoint_a_RDD() {
  JavaRDD<Integer> basicNumbersRDD =
          context.parallelize(IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList()), 2);
  // Checkpoint before an action
  basicNumbersRDD.checkpoint();

  basicNumbersRDD.count();

  assertThat(basicNumbersRDD.isCheckpointed()).isTrue();
  assertThat(basicNumbersRDD.getCheckpointFile().isPresent()).isTrue();
  assertThat(basicNumbersRDD.getCheckpointFile().get()).contains(CHECKPOINT_DIR).contains("rdd-0");
  File checkpointedDir = new File(basicNumbersRDD.getCheckpointFile().get().replace("file:", ""));
  assertThat(checkpointedDir).isDirectory();
  File[] checkpointedFiles = checkpointedDir.listFiles();
  assertThat(checkpointedFiles).hasSize(4);
  // For each saved file we store appropriated checksum (Cyclic Redundancy Check) file
  // Because we've defined the use of 2 partitions, 2 files are saved - one for each partition
  assertThat(checkpointedFiles).extracting("name").containsOnly("part-00000", ".part-00000.crc",
    "part-00001", ".part-00001.crc");
}

@Test
public void should_fail_on_checkpointing_after_action() {
  JavaRDD<Integer> basicNumbersRDD =
    context.parallelize(IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList()), 2);
  basicNumbersRDD.count();

  basicNumbersRDD.checkpoint();

  assertThat(basicNumbersRDD.isCheckpointed()).isFalse();
}

@Test
public void should_correctly_checkpoint_file_in_composed_rdd_flow() {
  JavaRDD<Integer> basicNumbersRDD =
    context.parallelize(IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList()), 2);
  JavaRDD<String> numberNamesRDD = basicNumbersRDD.filter(number -> number > 50)
    .map(number -> "Big number#" + number);
  JavaRDD<String> longNamesRDD = numberNamesRDD.filter(numberName -> numberName.length() > 12);

  longNamesRDD.checkpoint();

  String initialRDDDependencies = longNamesRDD.toDebugString();
  List<String> initialRDDLabels = longNamesRDD.collect();

  // Reinitialize context to see if checkpointed file
  // is reachable
  context.close();
  context = new JavaSparkContext(CONFIGURATION);
  JavaRDD<String> checkpointedLabelsRDD = context.checkpointFile(longNamesRDD.getCheckpointFile().get());
  String checkpointedRDDependencies = checkpointedLabelsRDD.toDebugString();
  List<String> checkpointedRDDLabels = checkpointedLabelsRDD.collect();
  // Check if nothing changed in checkpointed objects
  assertThat(checkpointedLabelsRDD.rdd()).isInstanceOf(ReliableCheckpointRDD.class);
  assertThat(initialRDDDependencies).isNotEqualTo(checkpointedRDDependencies);
  assertThat(checkpointedRDDependencies).startsWith("(2) ReliableCheckpointRDD[0]"); ;
  assertThat(initialRDDLabels).isEqualTo(checkpointedRDDLabels);
}

@Test(expected = IllegalArgumentException.class)
public void should_fail_on_reading_checkpoint_from_not_existent_location() {
  context.checkpointFile(CHECKPOINT_DIR+"/not_existent_file");
}

@Test
public void should_correctly_make_a_local_checkpoint() throws InterruptedException {
  JavaRDD<Integer> basicNumbersRDD =
    context.parallelize(IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList()), 2);

  basicNumbersRDD.rdd().localCheckpoint();

  basicNumbersRDD.count();

  String checkpointRDDDebug = basicNumbersRDD.toDebugString();

  // Debug info should look like:
  // (2) ParallelCollectionRDD[0] at parallelize at CheckpointTest.java:104 [Disk Memory Deserialized 1x Replicated]
  // |       CachedPartitions: 2; MemorySize: 2032.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
  // |  LocalCheckpointRDD[1] at count at CheckpointTest.java:108 [Disk Memory Deserialized 1x Replicated]
  assertThat(checkpointRDDDebug).contains("CachedPartitions: 2", "LocalCheckpointRDD[1]");
  assertThat(basicNumbersRDD.getCheckpointFile().isPresent()).isFalse();
}

This post shows some main concepts about checkpointing in Spark. In its first part we can learn that checkpoints can be stored on reliable system (as HDFS) or locally on executor's cache. We can also discover which are the differences between checkpoints and cache. The second part shows flow for RDD creation and reading. It also mentions about old checkpoints cleaning. The last part contains some test cases showing RDD use through Spark's API.

Share on: