Replication in HDFS

on waitingforcode.com

Replication in HDFS

HDFS is a fault-tolerant and resilient distributed file system. It wouldn't be possible without, among others, blocks replication.

This post covers the topic of replication in HDFS. The first part explains how the replication is made when a new file is written in HDFS. It covers flow explanation, configuration and replication state checks. The second part shows the replication under-the-hood by launching simple commands, investigating logs and objects involved during replication.

Replication explained

HDFS replication units are blocks composing files. When new not-empty file is written, its blocks are replicated if the configuration tells to do so. A file can be configured as "to replicate" through several different points:

  • configuration entries - different properties configuring replication exist
    • dfs.replication - default replication factor
    • dfs.replication.max - maximal replication factor
    • dfs.namenode.replication.min - minimal block replication factor
  • at creation time - when new file is created, we can define its replication factor. If this value is defined, it's used instead of default dfs.replication
  • after file creation - we can also modify previously defined replication factor with appropriated shell command (setrep).

A block is replicated under different conditions:

  • file creation - as already explained, file blocks can be replicated with default or custom replication factor.
  • replication factor change - if the replication factor change, block replication is triggered. It applies for the increase and decrease of replication factor. For the first case, new replicas are added. For the case of decrease, extra replicas are removed. For the removal, the NameNode tries to keep the same number of racks holding replicas.
  • block is in corrupted state - when a block is corrupted (at least 1 corrupt replica with at least 1 live replica), HDFS will trigger the replication for this block. The goal of this operation is to guarantee block availability.
  • block is misreplicated - it means that block is not fault-tolerant, for example all of its replicas are placed in the nodes of 1 rack.

Thus, replica placement is rack-aware. It means that NameNode ensures that at least 1 replica is stored on different rack than other replicas. The choice of place is made according to DataNodes states (available capacity, total capacity). NameNode will try to balance the work between DataNodes. For example, if a block is replicated twice, both replicas are stored on DataNodes of different racks. If we decide to increase the number of replicas to 3, the 3rd replica will be written on a node located in 1 of these 2 racks. In the other side, if we have 3 replicas and the replication factor decreases to 2, the extra replica will be removed from the rack having 2 replicas.

The replication is executed through a replication pipeline. When the replica is written on the first DataNode, this DataNode moves replicated block to the 2nd DataNode. If the 2nd DataNode ends to write the block, it moves the block to the 3rd DataNode and so on - until reaching the last DataNode supposed to hold the block.

A good rule of thumb for determining the number of replicas is to specify more replicas for often read or important files. It should increase not only the fault-tolerance but also read performance.

Replication errors

Replication errors are detected thanks to BlockReport sent regularly by DataNode. When a block is misreplicated, NameNode puts in into a priority queue. The priority is defined by replication state (number of lived replicas, ratio between actual and expected replication). The most priority are blocks with 0, 1 copies, ie. blocks with the risk of data loss.

Replication under-the-hood

To see what really happens when a block is replicated, we'll first start pseudo-distributed cluster with default replication factor set to 2:

sbin/start-dfs.sh
bin/hdfs datanode -Ddfs.datanode.address=0.0.0.0:50011 -Ddfs.datanode.http.address=0.0.0.0:50012 -Ddfs.datanode.ipc.address=0.0.0.0:50013 -Ddfs.datanode.data.dir=/home/bartosz/hdfs_dir/data_blocks2 -Dfs.defaultFS=hdfs://localhost:9000

Now, let's copy local not empty file to HDFS:

echo "Test 1" > tested_file.txt
bin/hadoop fs -copyFromLocal ~/tested_file.txt /tested_file.txt

After executing this operation, the logs should show below:

# NameNode
INFO org.apache.hadoop.hdfs.StateChange: BLOCK* allocate blk_1073741825_1001{UCState=UNDER_CONSTRUCTION, truncateBlock=null, primaryNodeIndex=-1, replicas=[ReplicaUC[[DISK]DS-8079cb7c-07ce-4387-add6-e9f807cf580d:NORMAL:127.0.0.1:50010|RBW], ReplicaUC[[DISK]DS-aff6dec1-9e6b-4b80-9912-c1a87f9ad076:NORMAL:127.0.0.1:50011|RBW]]} for /tested_file.txt._COPYING_


# DataNode#1
INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Receiving BP-53029200-127.0.1.1-1480594999349:blk_1073741825_1001 src: /127.0.0.1:44080 dest: /127.0.0.1:50010
INFO org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: /127.0.0.1:44080, dest: /127.0.0.1:50010, bytes: 7, op: HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_1536553193_1, offset: 0, srvID: a9a38ede-2e99-4a04-a634-077eed87c1b2, blockid: BP-53029200-127.0.1.1-1480594999349:blk_1073741825_1001, duration: 43281065
INFO org.apache.hadoop.hdfs.server.datanode.DataNode: PacketResponder: BP-53029200-127.0.1.1-1480594999349:blk_1073741825_1001, type=HAS_DOWNSTREAM_IN_PIPELINE terminating

# DataNode#2
INFO datanode.DataNode: Receiving BP-53029200-127.0.1.1-1480594999349:blk_1073741825_1001 src: /127.0.0.1:53322 dest: /127.0.0.1:50011
INFO DataNode.clienttrace: src: /127.0.0.1:53322, dest: /127.0.0.1:50011, bytes: 7, op: HDFS_WRITE, cliID: DFSClient_NONMAPREDUCE_1536553193_1, offset: 0, srvID: 4d7b818a-d782-420b-9aab-e5a5fb16ea6f, blockid: BP-53029200-127.0.1.1-1480594999349:blk_1073741825_1001, duration: 44604060
INFO datanode.DataNode: PacketResponder: BP-53029200-127.0.1.1-1480594999349:blk_1073741825_1001, type=LAST_IN_PIPELINE, downstreams=0:[] terminating

# NameNode once again - after 2 blocks were replicated
INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: BLOCK* blk_1073741825_1001{UCState=COMMITTED, truncateBlock=null, primaryNodeIndex=-1, replicas=[ReplicaUC[[DISK]DS-8079cb7c-07ce-4387-add6-e9f807cf580d:NORMAL:127.0.0.1:50010|RBW], ReplicaUC[[DISK]DS-aff6dec1-9e6b-4b80-9912-c1a87f9ad076:NORMAL:127.0.0.1:50011|RBW]]} is not COMPLETE (ucState = COMMITTED, replication# = 0 <  minimum = 2) in file /tested_file.txt._COPYING_

DEBUG BlockStateChange: *BLOCK* NameNode.blockReceivedAndDeleted: from DatanodeRegistration(127.0.0.1:50011, datanodeUuid=4d7b818a-d782-420b-9aab-e5a5fb16ea6f, infoPort=50012, infoSecurePort=0, ipcPort=50013, storageInfo=lv=-56;cid=CID-035234fe-b297-4df8-8e66-7f4309cded08;nsid=1781576136;c=0) 1 blocks.
DEBUG BlockStateChange: *BLOCK* NameNode.blockReceivedAndDeleted: from DatanodeRegistration(127.0.0.1:50010, datanodeUuid=a9a38ede-2e99-4a04-a634-077eed87c1b2, infoPort=50075, infoSecurePort=0, ipcPort=50020, storageInfo=lv=-56;cid=CID-035234fe-b297-4df8-8e66-7f4309cded08;nsid=1781576136;c=0) 1 blocks.
INFO BlockStateChange: BLOCK* addStoredBlock: blockMap updated: 127.0.0.1:50010 is added to blk_1073741825_1001{UCState=COMMITTED, truncateBlock=null, primaryNodeIndex=-1, replicas=[ReplicaUC[[DISK]DS-8079cb7c-07ce-4387-add6-e9f807cf580d:NORMAL:127.0.0.1:50010|RBW], ReplicaUC[[DISK]DS-aff6dec1-9e6b-4b80-9912-c1a87f9ad076:NORMAL:127.0.0.1:50011|RBW]]} size 7
DEBUG BlockStateChange: BLOCK* block RECEIVED_BLOCK: blk_1073741825_1001 is received from 127.0.0.1:50010
DEBUG BlockStateChange: *BLOCK* NameNode.processIncrementalBlockReport: from 127.0.0.1:50010 receiving: 0, received: 1, deleted: 0
BlockStateChange: BLOCK* addStoredBlock: blockMap updated: 127.0.0.1:50011 is added to blk_1073741825_1001{UCState=COMMITTED, truncateBlock=null, primaryNodeIndex=-1, replicas=[ReplicaUC[[DISK]DS-8079cb7c-07ce-4387-add6-e9f807cf580d:NORMAL:127.0.0.1:50010|RBW], ReplicaUC[[DISK]DS-aff6dec1-9e6b-4b80-9912-c1a87f9ad076:NORMAL:127.0.0.1:50011|RBW]]} size 7
BlockStateChange: BLOCK* block RECEIVED_BLOCK: blk_1073741825_1001 is received from 127.0.0.1:50011
DEBUG BlockStateChange: *BLOCK* NameNode.processIncrementalBlockReport: from 127.0.0.1:50011 receiving: 0, received: 1, deleted: 0
INFO org.apache.hadoop.hdfs.StateChange: DIR* completeFile: /tested_file.txt._COPYING_ is closed by DFSClient_NONMAPREDUCE_1536553193_1
INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=bartosz (auth:SIMPLE)	ip=/127.0.0.1	cmd=rename	src=/tested_file.txt._COPYING_	dst=/tested_file.txt	perm=bartosz:supergroup:rw-r--r--	proto=rpc

Among Java classes involved in replication process, we can distinguish org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault. It defines where replicas are placed. Another important class is UnderReplicatedBlocks from the same package. It keeps the list of all under-replicated blocks in 5 queues of different priority. Blocks are managed by BlockManager. This object controls replication state of blocks. It can detect misreplicated blocks, manage excess or corrupted replicas, or validate replication state.

Now we can see what happens when the replication factor is increased from 1 to 2 (cluster with default replication factor set to 1):

bin/hadoop fs -copyFromLocal ~/tested_file.txt /tested_file.txt
bin/hadoop fs -setrep 2 /tested_file.txt

And below the logs for replication factor change:

# NameNode
# First, under-replicated block is placed in priority queue 
DEBUG BlockStateChange: BLOCK* NameSystem.UnderReplicationBlock.update: blk_1073741825_1001 has only 1 replicas and needs 2 replicas so is added to neededReplications at priority level 0
INFO org.apache.hadoop.hdfs.server.blockmanagement.BlockManager: Increasing replication from 1 to 2 for /tested_file.txt
DEBUG BlockStateChange: BLOCK* block blk_1073741825_1001 is moved from neededReplications to pendingReplications
BlockStateChange: BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block blk_1073741825_1001 from priority queue 0

# As in the case of client's writes, replication factor change is also based on
# replication pipeline:
INFO BlockStateChange: BLOCK* ask 127.0.0.1:50011 to replicate blk_1073741825_1001 to datanode(s) 127.0.0.1:50010
DEBUG BlockStateChange: BLOCK* neededReplications = 0 pendingReplications = 1

# DataNode 127.0.0.1:50011 logs
# Signal reception
INFO datanode.DataNode: DatanodeRegistration(127.0.0.1:50011, datanodeUuid=0a5710af-7d7c-47d0-b97a-72e7ffe26a3e, infoPort=50012, infoSecurePort=0, ipcPort=50013, storageInfo=lv=-56;cid=CID-f38b2d36-3140-4f35-bb9a-304927bad5ba;nsid=1635021555;c=0) Starting thread to transfer BP-468949545-127.0.1.1-1480596702514:blk_1073741825_1001 to 127.0.0.1:50010 
INFO datanode.DataNode: DataTransfer: Transmitted BP-468949545-127.0.1.1-1480596702514:blk_1073741825_1001 (numBytes=7) to /127.0.0.1:50010

# DataNode 127.0.0.1:50010 logs
# It receives blocks to replicate 
INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Receiving BP-468949545-127.0.1.1-1480596702514:blk_1073741825_1001 src: /127.0.0.1:44354 dest: /127.0.0.1:50010
INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Received BP-468949545-127.0.1.1-1480596702514:blk_1073741825_1001 src: /127.0.0.1:44354 dest: /127.0.0.1:50010 of size 7

# NameNode logs
# NameNode is informed about new replica stored on 
# DataNode 127.0.0.1:50010
DEBUG BlockStateChange: *BLOCK* NameNode.blockReceivedAndDeleted: from DatanodeRegistration(127.0.0.1:50010, datanodeUuid=440e55c7-0659-4956-b0eb-f62953fc4ef3, infoPort=50075, infoSecurePort=0, ipcPort=50020, storageInfo=lv=-56;cid=CID-f38b2d36-3140-4f35-bb9a-304927bad5ba;nsid=1635021555;c=0) 1 blocks.
INFO BlockStateChange: BLOCK* addStoredBlock: blockMap updated: 127.0.0.1:50010 is added to blk_1073741825_1001 size 7
DEBUG BlockStateChange: BLOCK* block RECEIVED_BLOCK: blk_1073741825_1001 is received from 127.0.0.1:50010
DEBUG BlockStateChange: *BLOCK* NameNode.processIncrementalBlockReport: from 127.0.0.1:50010 receiving: 0, received: 1, deleted: 0

Replication guarantees high availability of data stored on HDFS. As proved by some studies, probability of block loss in large clusters (Yahoo's one with 25 000 nodes) with replication factor equals to 3 is about .005. The first part of this post explains how the replication is made and, especially, for which cases. The second part shows what happens when replication factor is changed and explains some of Java classes used to handle it.

Share on: