File operations in HDFS

on waitingforcode.com

File operations in HDFS

Previous article presented theoretical information about HDFS files. This post deepens this topic.

Through this article we can see different operations on HDFS files. The first part presents files writing. The second shows how to read files. The last part provides some information about files deletion. Each of sections contains the examples of file operations written with Java API.

Writing

File creation has not an immediate effect on HDFS. Before the file is physically created, client accumulates data in local temporary file. If the size of this temporary file reaches the block size, the client contacts NameNode to discover DataNodes that will store the block. NameNode sends this information back to the client which can start to writing file directly on indicated DataNodes.

File write is an exclusive one. It means that only one client can write given file at any time. This exclusivity is achieved thanks to lease grant on the file. The lease is granted by NameNode at the moment where client discovers which DataNode will hold file blocks.

Writing a file is related to its replication. Client sends block parts in packets. When the first DataNode correctly saves the first packet, it transfers this packet to the next DataNode (holds replica). If this next DataNode correctly saves the packet, it moves it to another DataNode replica and so on. File is considered as correctly written when all replicas receive the file and write it successfully.

Underlying file system stores HDFS files in different subdirectories. It's because HDFS assumes that this file system can work ineffectively with a lot of files in a single directory.

Below you can find some examples of writing:

@Test
public void should_create_file_in_hdfs() throws IOException {
  Path fileToCreate = new Path("/test_1.txt");
  FSDataOutputStream helloWorldFile = fileSystem.create(fileToCreate, OVERRIDE_FILE);
  helloWorldFile.close();

  assertThat(fileSystem.exists(fileToCreate)).isTrue();
}

@Test
public void should_copy_file_from_local_file_system_to_hdfs_and_keep_local_file_unchanged() throws IOException {
  String filePath = FileOprationsTest.class.getClassLoader().getResource("hello_world.txt").getPath();
  File localFile = new File(filePath);
  boolean deleteSourceFile = false;
  boolean overrideTargetFile = true;
  Path copiedFilePath = new Path(localFile.getAbsolutePath());
  Path[] sourceFiles = new Path[] {copiedFilePath};
  Path targetPath = new Path("/");

  fileSystem.copyFromLocalFile(deleteSourceFile, overrideTargetFile, sourceFiles, targetPath);

  assertThat(localFile).exists();
  // As you can see, file is not copied with all absolute path
  // It's put in the root directory, as specified in targetPath local variable
  assertThat(localFile.getAbsolutePath()).isNotEqualTo("/hello_world.txt");
  assertThat(fileSystem.exists(new Path("/hello_world.txt"))).isTrue();
}

@Test
public void should_copy_file_from_local_file_system_to_hdfs_and_delete_local_file() throws IOException {
  File localFile = new File("/tmp/text.txt");
  FileUtils.writeStringToFile(localFile, "Test content");
  boolean deleteSourceFile = true;
  boolean overrideTargetFile = true;
  Path copiedFilePath = new Path(localFile.getAbsolutePath());
  Path[] sourceFiles = new Path[] {copiedFilePath};
  Path targetPath = new Path("/");

  fileSystem.copyFromLocalFile(deleteSourceFile, overrideTargetFile, sourceFiles, targetPath);

  assertThat(localFile).doesNotExist();
  // As in should_copy_file_from_local_file_system_to_hdfs_and_keep_local_file_unchanged,
  // file is copied in the root directory
  assertThat(fileSystem.exists(new Path("/text.txt"))).isTrue();
}

@Test
public void should_move_file_from_local_file_system_to_hdfs() throws IOException {
  File localFile = new File("/tmp/hdfs_file_to_copy.txt");
  localFile.createNewFile();
  Path sourcePath = new Path(localFile.getAbsolutePath());
  Path targetPath = new Path("/");

  fileSystem.moveFromLocalFile(sourcePath, targetPath);

  assertThat(localFile).doesNotExist();
  assertThat(fileSystem.exists(new Path("/hdfs_file_to_copy.txt"))).isTrue();
}


@Test
public void should_change_file_content_in_hdfs() throws IOException {
  Path fileToCreate = new Path("/test_2.txt");
  short replicationFactor = 1;
  FSDataOutputStream writeStream = fileSystem.create(fileToCreate, replicationFactor);
  writeStream.writeUTF("Test_1;");
  writeStream.close();
  FSDataOutputStream appendStream = fileSystem.append(fileToCreate);
  appendStream.writeUTF("Test_2");
  appendStream.close();

  FSDataInputStream readStream = fileSystem.open(fileToCreate);

  String fileContent = readStream.readUTF()+readStream.readUTF();
  assertThat(fileContent).isEqualTo("Test_1;Test_2");
}

@Test
public void should_rename_hdfs_file() throws IOException {
  Path fileToCreate = new Path("/test_3.txt");
  FSDataOutputStream writeStream = fileSystem.create(fileToCreate, OVERRIDE_FILE);
  writeStream.close();
  assertThat(fileSystem.exists(fileToCreate)).isTrue();
  Path renamedPath = new Path("/renamed_test_3.txt");

  boolean wasRenamed = fileSystem.rename(fileToCreate, renamedPath);

  assertThat(wasRenamed).isTrue();
  assertThat(fileSystem.exists(fileToCreate)).isFalse();
  assertThat(fileSystem.exists(renamedPath)).isTrue();
}

@Test(expected = ClosedChannelException.class)
public void should_fail_on_changing_file_content_after_closing_it() throws IOException {
  Path fileToCreate = new Path("/test_3.txt");
  FSDataOutputStream writeStream = fileSystem.create(fileToCreate, OVERRIDE_FILE);
  writeStream.writeUTF("Test_1;");
  writeStream.close();
  writeStream.writeUTF("Should fail now because stream was already closed");
}


@Test
public void should_prove_that_utf_takes_more_place_than_string() throws IOException {
  Path fileToCreateBytes = new Path("/test_8_bytes.txt");
  Path fileToCreateUtf = new Path("/test_8_utf.txt");
  FSDataOutputStream bytesWriteStream = fileSystem.create(fileToCreateBytes, OVERRIDE_FILE);
  bytesWriteStream.writeBytes("123");
  bytesWriteStream.writeBytes("456");
  bytesWriteStream.close();
  FSDataOutputStream utfWriteStream = fileSystem.create(fileToCreateUtf, OVERRIDE_FILE);
  utfWriteStream.writeUTF("123");
  utfWriteStream.writeUTF("456");
  utfWriteStream.close();

  long fileBlockSizeByte = 
    fileSystem.getDefaultBlockSize(fileToCreateBytes);
  BlockLocation[] blockLocationsByte = 
    fileSystem.getFileBlockLocations(fileToCreateBytes, 0L, fileBlockSizeByte);
  long fileBlockSizeUtf = 
    fileSystem.getDefaultBlockSize(fileToCreateUtf);
  BlockLocation[] blockLocationsUtf = 
    fileSystem.getFileBlockLocations(fileToCreateUtf, 0L, fileBlockSizeUtf);

  // The difference in size comes from the fact that writeUTF adds always 2 bytes
  assertThat(blockLocationsUtf[0].getLength()).isEqualTo(10L);
  assertThat(blockLocationsByte[0].getLength()).isEqualTo(6L);
}

Reading

NameNode is also involved in reading operation and the process is similar to the writing. First, client contacts NameNode to discover where are located blocks of read file. NameNode sends the blocks sorted by location - from the closest to reader to the farest.

After discovering block locations, client contacts indicated DataNodes (if file splitted among blocks in different nodes). Each of them sends held file's part in packages.

If one of DataNodes fails to send demanded block, client contacts the 2nd closest replica and tries to get the content from there. It continues that on remaining replica, until getting content successfully or trying all replicas.

Below some examples of files read with HDFS Java API:

@Test
public void should_read_whole_file_content() throws IOException {
  Path fileToCreate = new Path("/test_2.txt");
  FSDataOutputStream writeStream = fileSystem.create(fileToCreate, OVERRIDE_FILE);
  writeStream.writeUTF("sample");
  writeStream.writeUTF(" ");
  writeStream.writeUTF("content");
  writeStream.close();

  FSDataInputStream readStream = fileSystem.open(fileToCreate);

  String fileContent = readStream.readUTF() + readStream.readUTF() + readStream.readUTF();
  assertThat(fileContent).isEqualTo("sample content");
}

@Test
public void should_seek_file_content() throws IOException {
  Path fileToCreate = new Path("/test_4.txt");
  FSDataOutputStream writeStream = fileSystem.create(fileToCreate, OVERRIDE_FILE);
  writeStream.writeUTF("abcdefghijklmnoprst");
  writeStream.close();

  FSDataInputStream readStream = fileSystem.open(fileToCreate);

  readStream.seek(7L);
  int nextByte;
  String fileContent = "";
  while((nextByte = readStream.read()) != -1) {
      fileContent += (char)nextByte;
  }
  // seek() makes that file is read from
  assertThat(fileContent).isEqualTo("fghijklmnoprst");
}

@Test(expected = EOFException.class)
public void should_fail_with_seek_and_fully_read() throws IOException {
  Path fileToCreate = new Path("/test_5.txt");
  FSDataOutputStream writeStream = fileSystem.create(fileToCreate, OVERRIDE_FILE);
  writeStream.writeUTF("abcdefghijklmnoprst");
  writeStream.close();

  FSDataInputStream readStream = fileSystem.open(fileToCreate);

  readStream.seek(7L);
  readStream.readUTF();
}

@Test
public void should_get_file_stats() throws IOException {
  long beforeAccessTime = System.currentTimeMillis();
  Path fileToCreate = new Path("/test_6.txt");
  FSDataOutputStream writeStream = fileSystem.create(fileToCreate, OVERRIDE_FILE);
  writeStream.writeUTF("1");
  writeStream.close();
  long afterWriteTime = System.currentTimeMillis();

  FileStatus fileStatus = fileSystem.getFileStatus(fileToCreate);

  short defaultReplication = Short.valueOf(HdfsConfiguration.get().get(DFS_REPLICATION_KEY));
  long blockSize = Long.valueOf(HdfsConfiguration.get().get(DFS_BLOCK_SIZE_KEY));
  assertThat(fileStatus.getReplication()).isEqualTo(defaultReplication);
  assertThat(fileStatus.isFile()).isTrue();
  assertThat(fileStatus.isDirectory()).isFalse();
  assertThat(fileStatus.isSymlink()).isFalse();
  assertThat(fileStatus.getBlockSize()).isEqualTo(blockSize);
  assertThat(fileStatus.getAccessTime())
    .isGreaterThan(beforeAccessTime).isLessThan(afterWriteTime);
  assertThat(fileStatus.getGroup()).isEqualTo("supergroup");
}

@Test
public void should_get_block_locations() throws IOException {
  Path fileToCreate = new Path("/test_7.txt");
  FSDataOutputStream writeStream = fileSystem.create(fileToCreate, OVERRIDE_FILE);
  writeStream.writeBytes("1");
  writeStream.close();

  long fileBlockSize = fileSystem.getDefaultBlockSize(fileToCreate);
  BlockLocation[] blockLocations = 
    fileSystem.getFileBlockLocations(fileToCreate, 0L, fileBlockSize);

  assertThat(blockLocations).hasSize(1);
  assertThat(blockLocations[0].isCorrupt()).isFalse();
  assertThat(blockLocations[0].getLength()).isEqualTo(1L);
  assertThat(blockLocations[0].getHosts().length).isEqualTo(1);
  assertThat(blockLocations[0].getTopologyPaths()[0]).startsWith("/default-rack");
}

@Test
public void should_get_empty_block_locations_for_empty_file() throws IOException {
  Path fileToCreate = new Path("/test_8.txt");
  FSDataOutputStream writeStream = fileSystem.create(fileToCreate, OVERRIDE_FILE);
  writeStream.close();

  long fileBlockSize = fileSystem.getDefaultBlockSize(fileToCreate);
  BlockLocation[] blockLocations = 
   fileSystem.getFileBlockLocations(fileToCreate, 0L, fileBlockSize);

  assertThat(blockLocations.length).isEqualTo(0);
}

Deleting

As local file system, HDFS supports also deletes. As a lot of other stuff, they're also configurable. By default, deleted files are immediately removed from file system. But when the fs.trash.interval is enabled (greater than 0), removed files are first moved to a trash and really deleted only some time after (time specified in configuration entry fs.trash.checkpoint.interval).

With trash feature enable, deleted files goes to the trash directory. This directory is by default located in this path: /user/${username}/.Trash. The fs.trash.interval property tells how long deleted files will remain this directory. Next, NameNode launches cleaning thread every fs.trash.checkpoint.interval minutes. If the thread detects some files older than fs.trash.interval, it deletes them physically from file system.

But cleaning thread performs also another task - it creates a checkpoint subdirectory in trash. All the most recent deleted files are placed in trash subdirectory called Current. When cleaning thread passes, it creates a checkpoint directory named with current timestamp (/user/ ${username}/.Trash/${timestamp}) and moves all of files from Current subdirectory to previously created timestamped location. It helps cleaning thread to detect files to delete physically.

Another point related to files deletion concerns the decrease of replication factor. If this property is reduced, the NameNode choses replicas to delete. With the next heartbeat call, NameNode sends this information to the DataNode which is in charge of remove excess replicas.

Both delete actions can not have an immediate effect, ie. the space can be freed some time after triggering the action.

Because showing trash feature through Java API is a little bit tricky, let's see that in command line (fs.trash.interval = 2, fs.trash.checkpoint.interval = 1):

# First, we create 3 empty files 
bin/hadoop fs -touchz /test1
bin/hadoop fs -touchz /test2
bin/hadoop fs -touchz /test3
# Next, we check if files were created
bin/hadoop fs -ls /
Found 3 items
-rw-r--r--   1 bartosz supergroup          0 2016-11-26 09:07 /test1
-rw-r--r--   1 bartosz supergroup          0 2016-11-26 09:07 /test2
-rw-r--r--   1 bartosz supergroup          0 2016-11-26 09:07 /test3
# Now we can remove each of them
bin/hadoop fs -rm /test1
16/11/26 09:08:11 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 2 minutes, Emptier interval = 1 minutes.
Moved: 'hdfs://localhost:9000/test1' to trash at: hdfs://localhost:9000/user/bartosz/.Trash/Current
bin/hadoop fs -rm /test2
16/11/26 09:08:13 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 2 minutes, Emptier interval = 1 minutes.
Moved: 'hdfs://localhost:9000/test2' to trash at: hdfs://localhost:9000/user/bartosz/.Trash/Current
bin/hadoop fs -rm /test3
16/11/26 09:08:16 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 2 minutes, Emptier interval = 1 minutes.
Moved: 'hdfs://localhost:9000/test3' to trash at: hdfs://localhost:9000/user/bartosz/.Trash/Current

# We check immediately if files were moved to Current
bin/hadoop fs -ls /user/bartosz/.Trash/Current
Found 3 items
-rw-r--r--   1 bartosz supergroup          0 2016-11-26 09:07 /user/bartosz/.Trash/Current/test1
-rw-r--r--   1 bartosz supergroup          0 2016-11-26 09:07 /user/bartosz/.Trash/Current/test2
-rw-r--r--   1 bartosz supergroup          0 2016-11-26 09:07 /user/bartosz/.Trash/Current/test3

# 2 minutes later, we check if files
# were moved to timestamped directory
bin/hadoop fs -ls /user/bartosz/.Trash/Current
ls: `/user/bartosz/.Trash/Current': No such file or directory
bin/hadoop fs -ls /user/bartosz/.Trash/
Found 1 items
drwx------   - bartosz supergroup          0 2016-11-26 09:08 /user/bartosz/.Trash/14801480590
bin/hadoop fs -ls /user/bartosz/.Trash/14801480590
Found 3 items
-rw-r--r--   1 bartosz supergroup          0 2016-11-26 09:07 /user/bartosz/.Trash/14801480590/test1
-rw-r--r--   1 bartosz supergroup          0 2016-11-26 09:07 /user/bartosz/.Trash/14801480590/test2
-rw-r--r--   1 bartosz supergroup          0 2016-11-26 09:07 /user/bartosz/.Trash/14801480590/test3

# 2 minutes later, when cleaning thread passes again,
# we check if timestamp directory was removed
bin/hadoop fs -ls /user/bartosz/.Trash/14801480590
ls: `/user/bartosz/.Trash/14801480590': No such file or directory

Below you can find what happens if trash is disabled:

# As previously, create file and delete it
bin/hadoop fs -touchz /test1
bin/hadoop fs -ls /
Found 1 items
-rw-r--r--   1 bartosz supergroup          0 2016-11-26 09:24 /test1
bin/hadoop fs -rm /test1
16/11/26 09:24:41 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /test1

Simple delete can be shown through Java API:

@Test
public void should_delete_file_in_hdfs() throws IOException {
  Path fileToDeletePath = new Path("/test_1.txt");
  FSDataOutputStream helloWorldFile = fileSystem.create(fileToDeletePath, OVERRIDE_FILE);
  helloWorldFile.close();
  boolean deleteRecursive = false;
  assertThat(fileSystem.exists(fileToDeletePath)).isTrue();

  boolean fileDeleted = fileSystem.delete(fileToDeletePath, deleteRecursive);
  assertThat(fileDeleted).isTrue();
  assertThat(fileSystem.exists(fileToDeletePath)).isFalse();
}

@Test
public void should_delete_file_moved_from_local_file_system_to_hdfs() throws IOException {
  // Here we expect only HDFS file to be deleted
  File localFile = new File("/tmp/hdfs_file_to_copy.txt");
  localFile.createNewFile();
  boolean deleteSourceFile = false;
  boolean overrideTargetFile = true;
  Path copiedFilePath = new Path(localFile.getAbsolutePath());
  Path[] sourceFiles = new Path[] {copiedFilePath};
  Path targetPath = new Path("/");

  fileSystem.copyFromLocalFile(deleteSourceFile, overrideTargetFile, 
     sourceFiles, targetPath);

  assertThat(localFile).exists();
  Path hdfsPath = new Path("/hdfs_file_to_copy.txt");
  assertThat(fileSystem.exists(hdfsPath)).isTrue();
  boolean recursiveDelete = false;
  assertThat(fileSystem.delete(hdfsPath, recursiveDelete)).isTrue();
  assertThat(fileSystem.exists(hdfsPath)).isFalse();
  assertThat(localFile).exists();
}

This post shows 3 file operations that are present in file systems: write, read and delete. The 2 first are orchestred by NameNode that communicates with DataNodes to write and read given file. The delete can be extended with a trash feature. If enabled, deleted files go first to trash directory from where they're deleted after some configured time interval.

Share on: