Checkpoint in HDFS

Versions: Hadoop 2.7.2

HDSF is not an exception in the Big Data world and as other actors, it also uses checkpoints.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in πŸ‘‰ Early Release on the O'Reilly platform

I also help solve your data engineering problems πŸ‘‰ contact@waitingforcode.com πŸ“©

Through this post we'll discover checkpoint feature in HDFS. The first part defines checkpoint from the global point of view. Two next parts describe it in non-HA and HA clusters. The 3rd section explains some implementaton details. The last part shows what happens when checkpoint in non-HA is made.

Checkpoint definition

To explain it simply in the context of HDFS, checkpoint is the merge of the last changes made on file system with the most recent FSImage. This operation helps to keep edit log files small enough to start NameNode pretty quickly. Otherwise, started NameNode would read changes from a lot of big edit logs one by one and apply them separetely to FSImage. And it could really slow down the performance and starting time.

Checkpoint is controled through 2 configuration properties. The first, dfs.namenode.checkpoint.period, tells how often it's made. The second entry, dfs.namenode.checkpoint.txns, defines how many new transactions can be accumulated in edit logs before triggering checkpoint generation. This number is checked regularly, with the interval defined in dfs.namenode.checkpoint.check.period property.

Checkpoint can be made either by Secondary NameNode (non-HA cluster) or by Standby NameNode (HA cluster). The purpose is globally the same but the working method is a little bit different.

Non-HA checkpointing

In this case, checkpoint is done by Secondary NameNode (SNN). First, it checks if one of 2 trigger conditions (elapsed time or number of transactions) is met. If it's the case, the checkpointing is started and executed in several steps:

  1. SSN asks NameNode to close the current active edit log segment. NameNode does that and opens new active segment to which are logged new operations.
  2. SNN gets the transaction ids of: current FSImage and previously active segment (closed in the 1st point).
  3. SSN gets FSImage and not merged edit log files from SNN.
  4. SNN replays all new changes from edit logs and writes new FSImage file containing them.
  5. SNN uploads new FSImage to NameNode and informs it that new FSImage file is available.

HA checkpointing

Checkpoint in HA cluster is done by Standby NameNode (SbNN). The operation is triggered with the same conditions as in the case of non-HA cluster. In fact, SbNN shares edit logs directory with NameNode. Thanks to that, it makes small checkpoints by periodically replaying new logged operations from this place. It's because checkpoint operation in that case is very similar to this edit log synchronization:

  1. If one of conditions is met, checkpoint is triggered
  2. Classical checkpoint is made, i.e. new edit logs are merged to SbNN's FSImage
  3. NameNode is informed by SbNN about new uploaded FSImage available

During checkpoint operation, SbNN is blocked for any other actions, such as NameNode failover.

Checkpoint under-the-hood

Checkpoint HA procedure is defined in org.apache.hadoop.hdfs.server.namenode.Checkpointer's method called doCheckpoint(). Let's explore it:

BackupImage bnImage = getFSImage();
NNStorage bnStorage = bnImage.getStorage();
// ... some lines later
CheckpointSignature sig = cpCmd.getSignature();
// Make sure we're talking to the same NN!
sig.validateStorageInfo(bnImage);

Checkpointer is a separate thread running on NameNode and checking if new checkpoint must be made. It first gets current FSImage and validates system state before getting edit logs from remote (may be shared) directory:

RemoteEditLogManifest manifest =
      getRemoteNamenodeProxy().getEditLogManifest(bnImage.getLastAppliedTxId() + 1);

If there are edit logs to merge, Checkpointer detects if current in-memory namesystem is up-to-date. If not, it reloads this state from FSImage downloaded from NameNode.

RemoteEditLog firstRemoteLog = manifest.getLogs().get(0);
if (firstRemoteLog.getStartTxId() > lastApplied + 1) {
  // ...
  needReloadImage = true;
}
// ... make other stuff, as downloading edit logs from
// remote shared directory to the local storage
if(needReloadImage) {
  LOG.info("Loading image with txid " + sig.mostRecentCheckpointTxId);
  File file = bnStorage.findImageFile(NameNodeFile.IMAGE,
    sig.mostRecentCheckpointTxId);
  bnImage.reloadFromImageFile(file, backupNode.getNamesystem());
}

Once FSImage is loaded, new edits logs are applied to in-memory namesystem and flushed on disk before informing NameNode about new FSImage available:

rollForwardByApplyingLogs(manifest, bnImage, backupNode.getNamesystem());
// ... some lines later, after acquiring locks on NameNode making checkpoint
bnImage.saveFSImageInAllDirs(backupNode.getNamesystem(), txid);
bnStorage.writeAll();

And eventually, transfer new FSImage to NameNode:

if(cpCmd.needToReturnImage()) {
  TransferFsImage.uploadImageFromStorage(backupNode.nnHttpAddress, conf,
    bnStorage, NameNodeFile.IMAGE, txid);
}

Checkpoint for non-HA cluster works similarly. It's orchestrated by org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode's doCheckpoint() method which starts by telling NameNode to close current active segment and start the new one:

// Tell the namenode to start logging transactions in a new edit file
// Returns a token that would be used to upload the merged image.
CheckpointSignature sig = namenode.rollEditLog();

If needed, FSImage is downloaded from NameNode:

boolean loadImage = false;
if (isFreshCheckpointer ||
        (isSameCluster &&
         !sig.storageVersionMatches(checkpointImage.getStorage()))) {
  // ...
  loadImage = true;
}

Just after, the job is done (new edit logs retrieval, logs applying on FSImage):

RemoteEditLogManifest manifest =
      namenode.getEditLogManifest(sig.mostRecentCheckpointTxId + 1);

// Fetch fsimage and edits. Reload the image if previous merge failed.
loadImage |= downloadCheckpointFiles(
  fsName, checkpointImage, sig, manifest) |
    checkpointImage.hasMergeError();
try {
  // Merges 
  doMerge(sig, manifest, loadImage, checkpointImage, namesystem);
} catch (IOException ioe) {
  // A merge error occurred. The in-memory file system state may be
  // inconsistent, so the image and edits need to be reloaded.
  checkpointImage.setMergeError();
  throw ioe;
}

At the end fresh FSImage is uploaded to NameNode:

long txid = checkpointImage.getLastAppliedTxId();
TransferFsImage.uploadImageFromStorage(fsName, conf, dstStorage,
  NameNodeFile.IMAGE, txid);

Checkpoint tracking

Steps described in the previous part can be easily read from NameNode logs. Below you can find an example of checkpoint activity:

INFO org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode: Image has changed. Downloading updated image from NN.
INFO org.apache.hadoop.hdfs.server.namenode.TransferFsImage: Opening connection to http://localhost:50070/imagetransfer?getimage=1&txid=2&storageInfo=-63: 1257678415:0:CID-ae6d4bea-3226-4255-961e-7617a8362530
INFO org.apache.hadoop.hdfs.server.namenode.TransferFsImage: Image Transfer timeout configured to 60000 milliseconds
INFO org.apache.hadoop.hdfs.server.namenode.TransferFsImage: Transfer took 0,03s at 0,00 KB/s
INFO org.apache.hadoop.hdfs.server.namenode.TransferFsImage: Downloaded file fsimage.ckpt_0000000000000000002 size 356 bytes.
INFO org.apache.hadoop.hdfs.server.namenode.TransferFsImage: Opening connection to http://localhost:50070/imagetransfer?getedit=1&startTxId=3&endTxId=18&storageInfo=-63:1257678415:0:CID-ae6d4bea-3226-4255-961e-7617a8362530
INFO org.apache.hadoop.hdfs.server.namenode.TransferFsImage: Transfer took 0,07s at 14422,54 KB/s
INFO org.apache.hadoop.hdfs.server.namenode.TransferFsImage: Downloaded file edits_tmp_0000000000000000003-0000000000000000018_0000000000004602161 size 0 bytes.
INFO org.apache.hadoop.hdfs.server.namenode.TransferFsImage: Opening connection to http://localhost:50070/imagetransfer?getedit=1&startTxId=19&endTxId=20&storageInfo=-63:1257678415:0:CID-ae6d4bea-3226-4255-961e-7617a8362530
INFO org.apache.hadoop.hdfs.server.namenode.TransferFsImage: Transfer took 0,02s at 0,00 KB/s
INFO org.apache.hadoop.hdfs.server.namenode.TransferFsImage: Downloaded file edits_tmp_0000000000000000019-0000000000000000020_0000000000004602236 size 0 bytes.
INFO org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode: Loading 1 INodes.
INFO org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf: Loaded FSImage in 0 seconds.
INFO org.apache.hadoop.hdfs.server.namenode.FSImage: Loaded image for txid 2 from /tmp/hadoop-bartosz/dfs/namesecondary/current/fsimage_0000000000000000002
INFO org.apache.hadoop.hdfs.server.namenode.NameCache: initialized with 0 entries 0 lookups

INFO org.apache.hadoop.hdfs.server.namenode.Checkpointer: Checkpointer about to load edits from 2 stream(s).
INFO org.apache.hadoop.hdfs.server.namenode.FSImage: Reading /tmp/hadoop-bartosz/dfs/namesecondary/current/edits_0000000000000000003-0000000000000000018 expecting start txid #3
INFO org.apache.hadoop.hdfs.server.namenode.FSImage: Start loading edits file /tmp/hadoop-bartosz/dfs/namesecondary/current/edits_0000000000000000003-0000000000000000018
// ... loads all logged operations
INFO org.apache.hadoop.hdfs.server.namenode.FSImage: Edits file /tmp/hadoop-bartosz/dfs/namesecondary/current/edits_0000000000000000003-0000000000000000018 of size 1048576 edits # 16 loaded in 0 seconds

INFO org.apache.hadoop.hdfs.server.namenode.FSImage: Reading /tmp/hadoop-bartosz/dfs/namesecondary/current/edits_0000000000000000019-0000000000000000020 expecting start txid #19
INFO org.apache.hadoop.hdfs.server.namenode.FSImage: Start loading edits file /tmp/hadoop-bartosz/dfs/namesecondary/current/edits_0000000000000000019-0000000000000000020
INFO org.apache.hadoop.hdfs.server.namenode.FSImage: Edits file /tmp/hadoop-bartosz/dfs/namesecondary/current/edits_0000000000000000019-0000000000000000020 of size 42 edits # 2 loaded in 0 seconds

INFO org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf: Saving image file /tmp/hadoop-bartosz/dfs/namesecondary/current/fsimage.ckpt_0000000000000000020 using no compression
INFO org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf: Image file /tmp/hadoop-bartosz/dfs/namesecondary/current/fsimage.ckpt_0000000000000000020 of size 897 bytes saved in 0 seconds.
INFO org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager: Going to retain 2 images with txid >= 2
INFO org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager: Purging old image FSImageFile(file=/tmp/hadoop-bartosz/dfs/namesecondary/current/fsimage_0000000000000000000, cpktTxId=0000000000000000000)
INFO org.apache.hadoop.hdfs.server.namenode.TransferFsImage: Uploaded image with txid 20 to namenode at http://localhost:50070 in 0.111 seconds
WARN org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode: Checkpoint done. New Image Size: 897

And in NameNode's logs we can find the track of new FSImage download:

INFO org.apache.hadoop.hdfs.server.namenode.TransferFsImage: Transfer took 0,05s at 0,00 KB/s
INFO org.apache.hadoop.hdfs.server.namenode.TransferFsImage: Downloaded file fsimage.ckpt_0000000000000000020 size 897 bytes.
INFO org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager: Going to retain 2 images with txid >= 2
INFO org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager: Purging old image FSImageFile(file=/home/bartosz/hdfs_dir/fsimage/current/fsimage_0000000000000000000, cpktTxId=0000000000000000000)

This post covers the checkpointing in HDFS. The first part explains the basics about this operation. We can learn from there that checkpointing is a simple merge of new logged events to previous system image. Two next parts show how checkpointing is done in 2 cluster modes: non-HA and HA. The before-last part explains what happens under-the-hood when a checkpoint is triggered. The last part exposes checkpoint logs showing which objects are involved.