Previous article presented theoretical information about HDFS files. This post deepens this topic.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
Through this article we can see different operations on HDFS files. The first part presents files writing. The second shows how to read files. The last part provides some information about files deletion. Each of sections contains the examples of file operations written with Java API.
Writing
File creation has not an immediate effect on HDFS. Before the file is physically created, client accumulates data in local temporary file. If the size of this temporary file reaches the block size, the client contacts NameNode to discover DataNodes that will store the block. NameNode sends this information back to the client which can start to writing file directly on indicated DataNodes.
File write is an exclusive one. It means that only one client can write given file at any time. This exclusivity is achieved thanks to lease grant on the file. The lease is granted by NameNode at the moment where client discovers which DataNode will hold file blocks.
Writing a file is related to its replication. Client sends block parts in packets. When the first DataNode correctly saves the first packet, it transfers this packet to the next DataNode (holds replica). If this next DataNode correctly saves the packet, it moves it to another DataNode replica and so on. File is considered as correctly written when all replicas receive the file and write it successfully.
Underlying file system stores HDFS files in different subdirectories. It's because HDFS assumes that this file system can work ineffectively with a lot of files in a single directory.
Below you can find some examples of writing:
@Test public void should_create_file_in_hdfs() throws IOException { Path fileToCreate = new Path("/test_1.txt"); FSDataOutputStream helloWorldFile = fileSystem.create(fileToCreate, OVERRIDE_FILE); helloWorldFile.close(); assertThat(fileSystem.exists(fileToCreate)).isTrue(); } @Test public void should_copy_file_from_local_file_system_to_hdfs_and_keep_local_file_unchanged() throws IOException { String filePath = FileOprationsTest.class.getClassLoader().getResource("hello_world.txt").getPath(); File localFile = new File(filePath); boolean deleteSourceFile = false; boolean overrideTargetFile = true; Path copiedFilePath = new Path(localFile.getAbsolutePath()); Path[] sourceFiles = new Path[] {copiedFilePath}; Path targetPath = new Path("/"); fileSystem.copyFromLocalFile(deleteSourceFile, overrideTargetFile, sourceFiles, targetPath); assertThat(localFile).exists(); // As you can see, file is not copied with all absolute path // It's put in the root directory, as specified in targetPath local variable assertThat(localFile.getAbsolutePath()).isNotEqualTo("/hello_world.txt"); assertThat(fileSystem.exists(new Path("/hello_world.txt"))).isTrue(); } @Test public void should_copy_file_from_local_file_system_to_hdfs_and_delete_local_file() throws IOException { File localFile = new File("/tmp/text.txt"); FileUtils.writeStringToFile(localFile, "Test content"); boolean deleteSourceFile = true; boolean overrideTargetFile = true; Path copiedFilePath = new Path(localFile.getAbsolutePath()); Path[] sourceFiles = new Path[] {copiedFilePath}; Path targetPath = new Path("/"); fileSystem.copyFromLocalFile(deleteSourceFile, overrideTargetFile, sourceFiles, targetPath); assertThat(localFile).doesNotExist(); // As in should_copy_file_from_local_file_system_to_hdfs_and_keep_local_file_unchanged, // file is copied in the root directory assertThat(fileSystem.exists(new Path("/text.txt"))).isTrue(); } @Test public void should_move_file_from_local_file_system_to_hdfs() throws IOException { File localFile = new File("/tmp/hdfs_file_to_copy.txt"); localFile.createNewFile(); Path sourcePath = new Path(localFile.getAbsolutePath()); Path targetPath = new Path("/"); fileSystem.moveFromLocalFile(sourcePath, targetPath); assertThat(localFile).doesNotExist(); assertThat(fileSystem.exists(new Path("/hdfs_file_to_copy.txt"))).isTrue(); } @Test public void should_change_file_content_in_hdfs() throws IOException { Path fileToCreate = new Path("/test_2.txt"); short replicationFactor = 1; FSDataOutputStream writeStream = fileSystem.create(fileToCreate, replicationFactor); writeStream.writeUTF("Test_1;"); writeStream.close(); FSDataOutputStream appendStream = fileSystem.append(fileToCreate); appendStream.writeUTF("Test_2"); appendStream.close(); FSDataInputStream readStream = fileSystem.open(fileToCreate); String fileContent = readStream.readUTF()+readStream.readUTF(); assertThat(fileContent).isEqualTo("Test_1;Test_2"); } @Test public void should_rename_hdfs_file() throws IOException { Path fileToCreate = new Path("/test_3.txt"); FSDataOutputStream writeStream = fileSystem.create(fileToCreate, OVERRIDE_FILE); writeStream.close(); assertThat(fileSystem.exists(fileToCreate)).isTrue(); Path renamedPath = new Path("/renamed_test_3.txt"); boolean wasRenamed = fileSystem.rename(fileToCreate, renamedPath); assertThat(wasRenamed).isTrue(); assertThat(fileSystem.exists(fileToCreate)).isFalse(); assertThat(fileSystem.exists(renamedPath)).isTrue(); } @Test(expected = ClosedChannelException.class) public void should_fail_on_changing_file_content_after_closing_it() throws IOException { Path fileToCreate = new Path("/test_3.txt"); FSDataOutputStream writeStream = fileSystem.create(fileToCreate, OVERRIDE_FILE); writeStream.writeUTF("Test_1;"); writeStream.close(); writeStream.writeUTF("Should fail now because stream was already closed"); } @Test public void should_prove_that_utf_takes_more_place_than_string() throws IOException { Path fileToCreateBytes = new Path("/test_8_bytes.txt"); Path fileToCreateUtf = new Path("/test_8_utf.txt"); FSDataOutputStream bytesWriteStream = fileSystem.create(fileToCreateBytes, OVERRIDE_FILE); bytesWriteStream.writeBytes("123"); bytesWriteStream.writeBytes("456"); bytesWriteStream.close(); FSDataOutputStream utfWriteStream = fileSystem.create(fileToCreateUtf, OVERRIDE_FILE); utfWriteStream.writeUTF("123"); utfWriteStream.writeUTF("456"); utfWriteStream.close(); long fileBlockSizeByte = fileSystem.getDefaultBlockSize(fileToCreateBytes); BlockLocation[] blockLocationsByte = fileSystem.getFileBlockLocations(fileToCreateBytes, 0L, fileBlockSizeByte); long fileBlockSizeUtf = fileSystem.getDefaultBlockSize(fileToCreateUtf); BlockLocation[] blockLocationsUtf = fileSystem.getFileBlockLocations(fileToCreateUtf, 0L, fileBlockSizeUtf); // The difference in size comes from the fact that writeUTF adds always 2 bytes assertThat(blockLocationsUtf[0].getLength()).isEqualTo(10L); assertThat(blockLocationsByte[0].getLength()).isEqualTo(6L); }
Reading
NameNode is also involved in reading operation and the process is similar to the writing. First, client contacts NameNode to discover where are located blocks of read file. NameNode sends the blocks sorted by location - from the closest to reader to the farest.
After discovering block locations, client contacts indicated DataNodes (if file splitted among blocks in different nodes). Each of them sends held file's part in packages.
If one of DataNodes fails to send demanded block, client contacts the 2nd closest replica and tries to get the content from there. It continues that on remaining replica, until getting content successfully or trying all replicas.
Below some examples of files read with HDFS Java API:
@Test public void should_read_whole_file_content() throws IOException { Path fileToCreate = new Path("/test_2.txt"); FSDataOutputStream writeStream = fileSystem.create(fileToCreate, OVERRIDE_FILE); writeStream.writeUTF("sample"); writeStream.writeUTF(" "); writeStream.writeUTF("content"); writeStream.close(); FSDataInputStream readStream = fileSystem.open(fileToCreate); String fileContent = readStream.readUTF() + readStream.readUTF() + readStream.readUTF(); assertThat(fileContent).isEqualTo("sample content"); } @Test public void should_seek_file_content() throws IOException { Path fileToCreate = new Path("/test_4.txt"); FSDataOutputStream writeStream = fileSystem.create(fileToCreate, OVERRIDE_FILE); writeStream.writeUTF("abcdefghijklmnoprst"); writeStream.close(); FSDataInputStream readStream = fileSystem.open(fileToCreate); readStream.seek(7L); int nextByte; String fileContent = ""; while((nextByte = readStream.read()) != -1) { fileContent += (char)nextByte; } // seek() makes that file is read from assertThat(fileContent).isEqualTo("fghijklmnoprst"); } @Test(expected = EOFException.class) public void should_fail_with_seek_and_fully_read() throws IOException { Path fileToCreate = new Path("/test_5.txt"); FSDataOutputStream writeStream = fileSystem.create(fileToCreate, OVERRIDE_FILE); writeStream.writeUTF("abcdefghijklmnoprst"); writeStream.close(); FSDataInputStream readStream = fileSystem.open(fileToCreate); readStream.seek(7L); readStream.readUTF(); } @Test public void should_get_file_stats() throws IOException { long beforeAccessTime = System.currentTimeMillis(); Path fileToCreate = new Path("/test_6.txt"); FSDataOutputStream writeStream = fileSystem.create(fileToCreate, OVERRIDE_FILE); writeStream.writeUTF("1"); writeStream.close(); long afterWriteTime = System.currentTimeMillis(); FileStatus fileStatus = fileSystem.getFileStatus(fileToCreate); short defaultReplication = Short.valueOf(HdfsConfiguration.get().get(DFS_REPLICATION_KEY)); long blockSize = Long.valueOf(HdfsConfiguration.get().get(DFS_BLOCK_SIZE_KEY)); assertThat(fileStatus.getReplication()).isEqualTo(defaultReplication); assertThat(fileStatus.isFile()).isTrue(); assertThat(fileStatus.isDirectory()).isFalse(); assertThat(fileStatus.isSymlink()).isFalse(); assertThat(fileStatus.getBlockSize()).isEqualTo(blockSize); assertThat(fileStatus.getAccessTime()) .isGreaterThan(beforeAccessTime).isLessThan(afterWriteTime); assertThat(fileStatus.getGroup()).isEqualTo("supergroup"); } @Test public void should_get_block_locations() throws IOException { Path fileToCreate = new Path("/test_7.txt"); FSDataOutputStream writeStream = fileSystem.create(fileToCreate, OVERRIDE_FILE); writeStream.writeBytes("1"); writeStream.close(); long fileBlockSize = fileSystem.getDefaultBlockSize(fileToCreate); BlockLocation[] blockLocations = fileSystem.getFileBlockLocations(fileToCreate, 0L, fileBlockSize); assertThat(blockLocations).hasSize(1); assertThat(blockLocations[0].isCorrupt()).isFalse(); assertThat(blockLocations[0].getLength()).isEqualTo(1L); assertThat(blockLocations[0].getHosts().length).isEqualTo(1); assertThat(blockLocations[0].getTopologyPaths()[0]).startsWith("/default-rack"); } @Test public void should_get_empty_block_locations_for_empty_file() throws IOException { Path fileToCreate = new Path("/test_8.txt"); FSDataOutputStream writeStream = fileSystem.create(fileToCreate, OVERRIDE_FILE); writeStream.close(); long fileBlockSize = fileSystem.getDefaultBlockSize(fileToCreate); BlockLocation[] blockLocations = fileSystem.getFileBlockLocations(fileToCreate, 0L, fileBlockSize); assertThat(blockLocations.length).isEqualTo(0); }
Deleting
As local file system, HDFS supports also deletes. As a lot of other stuff, they're also configurable. By default, deleted files are immediately removed from file system. But when the fs.trash.interval is enabled (greater than 0), removed files are first moved to a trash and really deleted only some time after (time specified in configuration entry fs.trash.checkpoint.interval).
With trash feature enable, deleted files goes to the trash directory. This directory is by default located in this path: /user/${username}/.Trash. The fs.trash.interval property tells how long deleted files will remain this directory. Next, NameNode launches cleaning thread every fs.trash.checkpoint.interval minutes. If the thread detects some files older than fs.trash.interval, it deletes them physically from file system.
But cleaning thread performs also another task - it creates a checkpoint subdirectory in trash. All the most recent deleted files are placed in trash subdirectory called Current. When cleaning thread passes, it creates a checkpoint directory named with current timestamp (/user/ ${username}/.Trash/${timestamp}) and moves all of files from Current subdirectory to previously created timestamped location. It helps cleaning thread to detect files to delete physically.
Another point related to files deletion concerns the decrease of replication factor. If this property is reduced, the NameNode choses replicas to delete. With the next heartbeat call, NameNode sends this information to the DataNode which is in charge of remove excess replicas.
Both delete actions can not have an immediate effect, ie. the space can be freed some time after triggering the action.
Because showing trash feature through Java API is a little bit tricky, let's see that in command line (fs.trash.interval = 2, fs.trash.checkpoint.interval = 1):
# First, we create 3 empty files bin/hadoop fs -touchz /test1 bin/hadoop fs -touchz /test2 bin/hadoop fs -touchz /test3 # Next, we check if files were created bin/hadoop fs -ls / Found 3 items -rw-r--r-- 1 bartosz supergroup 0 2016-11-26 09:07 /test1 -rw-r--r-- 1 bartosz supergroup 0 2016-11-26 09:07 /test2 -rw-r--r-- 1 bartosz supergroup 0 2016-11-26 09:07 /test3 # Now we can remove each of them bin/hadoop fs -rm /test1 16/11/26 09:08:11 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 2 minutes, Emptier interval = 1 minutes. Moved: 'hdfs://localhost:9000/test1' to trash at: hdfs://localhost:9000/user/bartosz/.Trash/Current bin/hadoop fs -rm /test2 16/11/26 09:08:13 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 2 minutes, Emptier interval = 1 minutes. Moved: 'hdfs://localhost:9000/test2' to trash at: hdfs://localhost:9000/user/bartosz/.Trash/Current bin/hadoop fs -rm /test3 16/11/26 09:08:16 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 2 minutes, Emptier interval = 1 minutes. Moved: 'hdfs://localhost:9000/test3' to trash at: hdfs://localhost:9000/user/bartosz/.Trash/Current # We check immediately if files were moved to Current bin/hadoop fs -ls /user/bartosz/.Trash/Current Found 3 items -rw-r--r-- 1 bartosz supergroup 0 2016-11-26 09:07 /user/bartosz/.Trash/Current/test1 -rw-r--r-- 1 bartosz supergroup 0 2016-11-26 09:07 /user/bartosz/.Trash/Current/test2 -rw-r--r-- 1 bartosz supergroup 0 2016-11-26 09:07 /user/bartosz/.Trash/Current/test3 # 2 minutes later, we check if files # were moved to timestamped directory bin/hadoop fs -ls /user/bartosz/.Trash/Current ls: `/user/bartosz/.Trash/Current': No such file or directory bin/hadoop fs -ls /user/bartosz/.Trash/ Found 1 items drwx------ - bartosz supergroup 0 2016-11-26 09:08 /user/bartosz/.Trash/14801480590 bin/hadoop fs -ls /user/bartosz/.Trash/14801480590 Found 3 items -rw-r--r-- 1 bartosz supergroup 0 2016-11-26 09:07 /user/bartosz/.Trash/14801480590/test1 -rw-r--r-- 1 bartosz supergroup 0 2016-11-26 09:07 /user/bartosz/.Trash/14801480590/test2 -rw-r--r-- 1 bartosz supergroup 0 2016-11-26 09:07 /user/bartosz/.Trash/14801480590/test3 # 2 minutes later, when cleaning thread passes again, # we check if timestamp directory was removed bin/hadoop fs -ls /user/bartosz/.Trash/14801480590 ls: `/user/bartosz/.Trash/14801480590': No such file or directory
Below you can find what happens if trash is disabled:
# As previously, create file and delete it bin/hadoop fs -touchz /test1 bin/hadoop fs -ls / Found 1 items -rw-r--r-- 1 bartosz supergroup 0 2016-11-26 09:24 /test1 bin/hadoop fs -rm /test1 16/11/26 09:24:41 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes. Deleted /test1
Simple delete can be shown through Java API:
@Test public void should_delete_file_in_hdfs() throws IOException { Path fileToDeletePath = new Path("/test_1.txt"); FSDataOutputStream helloWorldFile = fileSystem.create(fileToDeletePath, OVERRIDE_FILE); helloWorldFile.close(); boolean deleteRecursive = false; assertThat(fileSystem.exists(fileToDeletePath)).isTrue(); boolean fileDeleted = fileSystem.delete(fileToDeletePath, deleteRecursive); assertThat(fileDeleted).isTrue(); assertThat(fileSystem.exists(fileToDeletePath)).isFalse(); } @Test public void should_delete_file_moved_from_local_file_system_to_hdfs() throws IOException { // Here we expect only HDFS file to be deleted File localFile = new File("/tmp/hdfs_file_to_copy.txt"); localFile.createNewFile(); boolean deleteSourceFile = false; boolean overrideTargetFile = true; Path copiedFilePath = new Path(localFile.getAbsolutePath()); Path[] sourceFiles = new Path[] {copiedFilePath}; Path targetPath = new Path("/"); fileSystem.copyFromLocalFile(deleteSourceFile, overrideTargetFile, sourceFiles, targetPath); assertThat(localFile).exists(); Path hdfsPath = new Path("/hdfs_file_to_copy.txt"); assertThat(fileSystem.exists(hdfsPath)).isTrue(); boolean recursiveDelete = false; assertThat(fileSystem.delete(hdfsPath, recursiveDelete)).isTrue(); assertThat(fileSystem.exists(hdfsPath)).isFalse(); assertThat(localFile).exists(); }
This post shows 3 file operations that are present in file systems: write, read and delete. The 2 first are orchestred by NameNode that communicates with DataNodes to write and read given file. The delete can be extended with a trash feature. If enabled, deleted files go first to trash directory from where they're deleted after some configured time interval.