Spark Streaming checkpointing and Write Ahead Logs

on waitingforcode.com

Spark Streaming checkpointing and Write Ahead Logs

Checkpoint allows Spark to truncate dependencies on previously computed RDDs. In the case of streams processing their role is extended. In additional, they're not a single method to prevent against failures.

This post describes 2 techniques to deal with fault-tolerancy in Spark Streaming: checkpointing and Write Ahead Logs. Both will be presented in two distinct parts. The last part will show how to implement both mechanisms.

Importance of checkpoints

Streaming operations work on live data, very often produced every little second, 24/7. It's the reason why the ability to recover from failures is important. One of solutions to guarantee fault tolerance are checkpoints. Basically checkpoints from Spark Streaming are quite similar to the ones from batch oriented Spark. Both allow to save truncated (without dependencies) RDDs. But Spark Streaming checkpoints has another feature - the second type of checkpoints, called metadata checkpoint.

Metadata checkpoint saves information used to launch streaming context into reliable storage as HDFS or S3. Thanks to that, Spark Streaming can recover streaming context for failed driver node. As metadata are considered: streaming application configuration, DStream operations defining the application and not completed but queued batches.

The second type of checkpoint, data checkpoint, applies to generated RDDs. As in the case of metadata, they're stored in reliable storage. Data checkpoint is useful in stateful operations where data processed at time t depends on data generated at time t-1, t-2, until t-n where n is the definition of stateful operation's duration (for instance window duration). In this situation, the purpose of checkpoint is to store less data (without dependencies) than in the case of caching.

Obsolete checkpoints are cleared automatically when new checkpoints are saved. It can be observed with following entries in log files:

DEBUG Clearing checkpoint data (org.apache.spark.streaming.dstream.ForEachDStream:58)
INFO Saving checkpoint for time 1475022621700 ms to file 'file:/tmp/streaming_data_checkpoint/checkpoint-1475022621700' (org.apache.spark.streaming.CheckpointWriter:54)
Deleting file:/tmp/streaming_data_checkpoint/checkpoint-1475022621100 (org.apache.spark.streaming.CheckpointWriter:54) 

As you can also observe, new checkpoints are created by CheckpointWriter. More precisely, it delegates checkpoints creation to its internal class CheckpointWriteHandler:

INFO Submitted checkpoint of time 1475022621700 ms to writer queue (org.apache.spark.streaming.CheckpointWriter:54)
INFO Saving checkpoint for time 1475022621700 ms to file 'file:/tmp/streaming_data_checkpoint/checkpoint-1475022621700' (org.apache.spark.streaming.CheckpointWriter:54)

Write Ahead Logs

Spark Streaming also has another protection against failures - a logs journal called Write Ahead Logs (WAL). Introduced in Spark 1.2, this structure enforces fault-tolerance by saving all data received by the receivers to logs file located in checkpoint directory. It can be enabled through spark.streaming.receiver.writeAheadLog.enable property.

WAL help to prevent against data loss, for instance in the case when data was received and not processed before driver's failure. It's because data is always written first to ahead logs and only after it's made available for processing. Thus the data is automatically available for reprocessing after streaming context recovery.

Please note that when ahead logs are activated, cache level shouldn't make a replication. WAL are already written to fault-tolerant and reliable filesystem, so additional overhead of cache replication is not necessary. Additional condition is the reliability of receiver. In fact, it should acknowledge data reception only after be sure to save it into ahead logs.

Logs are saved in receivedBlockMetadata/, located inside checkpoint directory. Files are suffixed by log-. Similarly to checkpoints, old logs are cleaned automatically by Spark. This activity can also be observed in logs:

INFO Attempting to clear 0 old log files in file:/tmp/streaming_data_checkpoint/receivedBlockMetadata older than 1475022621700:  (org.apache.spark.streaming.util.FileBasedWriteAheadLog_ReceivedBlockTracker:54)

Spark Streaming checkpoint example

Below simple test cases show the use of checkpoints and test if WAL are written successfuly:

private static final long BATCH_INTERVAL = 250L;
private static final SparkConf CONFIGURATION =
  new SparkConf().setAppName("Checkpoint Test").setMaster("local[4]")
    .set("spark.streaming.receiver.writeAheadLog.enable", "true");
private JavaSparkContext batchContext;
private JavaStreamingContext streamingContext;
private static final String DATA_CHECKPOINT_DIR = "/tmp/streaming_data_checkpoint/";


@Before
public void initContext() {
  batchContext = new JavaSparkContext(CONFIGURATION);
  streamingContext = new JavaStreamingContext(batchContext, Durations.milliseconds(BATCH_INTERVAL));
}

@After
public void stopContext() {
  streamingContext.stop(true);
}

@Test
public void should_create_stream_with_data_checkpoint_enabled() throws IOException, InterruptedException {
  streamingContext.checkpoint(DATA_CHECKPOINT_DIR);

  JavaDStream<String> temporarySparkFilesDStream = 
    streamingContext.textFileStream("./resources/files/numbers.txt");
  temporarySparkFilesDStream.foreachRDD(rdd -> {});

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(BATCH_INTERVAL*5);
  streamingContext.stop();

  // To restore context from factory method getOrCreate(...) should be used.
  // The first parameter corresponds to directory containing checkpoint context
  // while the second one is a factory method creating a context from scratch.
  streamingContext = JavaStreamingContext.getOrCreate(DATA_CHECKPOINT_DIR, () -> {
    throw new IllegalStateException("Context should be created from checkpoint");
  });
  streamingContext.start();
}

@Test
public void should_fail_on_trying_to_restore_context_from_not_existent_checkpoint() throws IOException, InterruptedException {
  streamingContext.checkpoint(DATA_CHECKPOINT_DIR);

  JavaDStream<String> temporarySparkFilesDStream = 
    streamingContext.textFileStream("./resources/files/numbers.txt");
  temporarySparkFilesDStream.foreachRDD(rdd -> {});

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(BATCH_INTERVAL*5);
  streamingContext.stop();

  try {
    streamingContext = JavaStreamingContext.getOrCreate("/tmp/not_a_spark_checkpoint_directory", () -> {
      throw new IllegalStateException("Context should be created from checkpoint");
    });
    fail("Should not be able to create context from not checkpointed files");
  } catch (IllegalStateException ise) {
    assertThat(ise.getMessage()).isEqualTo("Context should be created from checkpoint");
  }
}

@Test
public void should_save_data_into_ahead_logs() throws IOException, InterruptedException {
  String aheadLogsDirectory = DATA_CHECKPOINT_DIR+"/receivedBlockMetadata";
  FileUtils.cleanDirectory(new File(aheadLogsDirectory));
  streamingContext.checkpoint(DATA_CHECKPOINT_DIR);

  JavaDStream<String> temporarySparkFilesDStream = 
    streamingContext.textFileStream("./resources/files/numbers.txt");
  temporarySparkFilesDStream.foreachRDD(rdd -> {});

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(BATCH_INTERVAL*5);

  File aheadLogsDir = new File(aheadLogsDirectory);
  List<File> aheadLogs = Lists.newArrayList(aheadLogsDir.listFiles());

  assertThat(aheadLogs).hasSize(1);
  aheadLogs.get(0).getName().startsWith("log-");
}

This article presented checkpointing and a structure called Write Ahead Logs as methods helping to ensure fault-tolerance of Spark Streaming processing. Despite many advantages, they have also some disadvantages, as an overhead which can slow down data processing (the workaround is to add more receivers). After two first presentation sections, the last part shown some learning tests with the use of checkpoints and WAL.

If you liked it, you should read: GraphX and fault-tolerance Metadata checkpoint

Share on: