Data representation in Spark - RDD

on waitingforcode.com

Data representation in Spark - RDD

The first post about Spark internals concerns Resilient Distributed Dataset (RDD), an abstraction used to represent processed data.

At the beginning, we describe what RDDs are. After we'll see the types of RDD and how they can be created. The third part shows how to manipulate them through Java's API for the 2.0.0 version of Spark.

What is RDD?

RDD a key abstraction used to represent data manipulated by Spark computations. To define it more exactly, let's list some of its characteristics:

  • immutable - once created, given RDD can't be changed.
  • partitioned - data stored in RDD is partitioned. It means that internally, Spark will assign parts of RDD to the most optimal number of workers.
  • lazy-loaded - RDD is not loaded at declaration time. It's created only when one operation is made on it.
  • parallelized - thanks to partitions layer over RDD, data processing can be paralellized, ie. chunks of data can be executed in the same time on different partitions.
  • fault-tolerant - RDD is fault-tolerant. It means that if one worker fails to finish it computation on a part of RDD, for example because of RuntimeException, another worker will be able to make the same computation. Fault tolerancy is managed by a log updated technique, based on the concept called lineage. Every time when one operation is made on RDD, Spark memorizes this operation. By doing that, Spark knows how given RDD was produced (which operations were applied on it in the past). Now, if one partition fails, Spark will use this stored information to recover lost partition elsewhere and continue data processing.
  • defined by - 5 values define one specific RDD:
    • list of partitions
    • function used to split provided dataset, for example: a function splitting dataset on new lines character (\n)
    • list of dependencies on other RDDs, for example: parent RDD from which this one was transformed
    • (optional) a partitioner used to handle RDD's data distribution over partitions
    • (optional) list of preferred locations to compute each split on

In additional, RDD can be cached and manually partitioned. The first operation is useful especially in the case when given RDD must be used several times . We can cache RDD in RAM or directly in disk. If available cache memory is full, Spark will use Least Recently Used (LRU) algorithm. The idea of this algorithm is to keep only the most recent data. So, if cache is full, Spark removes old entries until it makes enough place to fresh cache data. Regarding to manual partitioning, it's important to correctly balance partitions. Generally, smaller partitions allow to distribute RDD data more equally, among more executors. In the other side, fewer partitions can make work to be done earlier.

RDD types

So, how one RDD can be created ? First and maybe the simplest solution consists on paralellizing Java objects. But paralellized objects must respect one main rule - they must implement Serializable interface. The other method is about loading data from file and converting it automatically to an instance of RDD class. Also, as already mentioned, RDDs can also be created from other RDDs or even from relational database.

Naturally, because RDDs can be initialized from different sources, in API they're represented by different objects. The most basic RDD is represented by org.apache.spark.api.java.JavaRDD class. It's used to represent one specific value, such as String, Integer or any other object. The next type, org.apache.spark.api.java.JavaPairRDD can handle a pair of values, so for example <String, Integer> where String represents the name of one person and Integer his age. These two will be used in below tests. But there are some others RDD types, representing: data coming from database (JdbcRDD), whole text files (WholeTextFileRDD), binary data (BinaryFileRDD) or collections (ParallelCollectionRDD).

RDD example in Java API

Below you can find some test cases showing working with RDD:

private static final String NUMBERS_FILE = FileHelper.readFileFromClasspath("files/numbers.txt").getPath();
private static final String LOGS_FILE = FileHelper.readFileFromClasspath("files/log_entries.txt").getPath();

@Test(expected = SparkException.class)
public void should_fail_only_guessing_type_for_not_sequence_file() {
  JavaRDD<Integer> numbers = CONTEXT.objectFile(NUMBERS_FILE);
  // We expect an error because analyzed file is not a Hadoop's SequenceFile
  numbers.collect();
}

@Test
public void should_convert_string_to_integers() {
  // The same test as the previous but this time we use a mapper to convert
  // Strings to Integers
  JavaRDD<String> numbersStringRDD = CONTEXT.textFile(NUMBERS_FILE);
  // Next we use a transformation function which created new RDD holding Integer objects
  // The call can be chained with the previous initialization but for better
  // readability, we prefer to have 2 distinct declarations
  // We can't only collect raw data because the dataset contains badly formatted
  // value (XYZ which throws NumberFormatException). It's the reasony why additional
  // filtering on not null values is expected
  JavaRDD<Integer> numbersRDD = numbersStringRDD.map(RDDTest::integerFromString)
    .filter(nb -> nb != null);

  List<Integer> numbersList = numbersRDD.collect();

  assertThat(numbersList).hasSize(12);
  assertThat(numbersList).containsExactly(1, 2, 3, 10, 111, 145, 180, 200, 201, 239, 250, 290);
}

@Test
public void should_not_fail_on_creating_rdd_from_not_existent_file() {
  // RDD are lazy-evaluated. It means that the file is not loaded until the moment
  // when it's not used (transformation or action applied)
  // So this code is completely valid because we don't make any operation on
  // this not existent RDD file
  CONTEXT.textFile(LOGS_FILE+"_not_existent");
}

@Test(expected = InvalidInputException.class)
public void should_fail_on_applying_actions_for_not_existent_file() {
  JavaRDD<String> notExistentLogsRdd = CONTEXT.textFile(LOGS_FILE+"_not_existent");
  // This time we load not existent file to RDD and make some
  // actions on it
  notExistentLogsRdd.collect();
}

@Test
public void should_create_rdd_from_objects_collection() {
  List<Boolean> votesList = Lists.newArrayList(true, false, true, true, true, false);

  JavaRDD<Boolean> votesRDD = CONTEXT.parallelize(votesList);
  Collection<Boolean> votesFor = votesRDD.filter(vote -> vote).collect();

  assertThat(votesFor).hasSize(4);
  assertThat(votesFor).containsOnly(true);
}

@Test
public void should_save_rdd_after_action_application() throws IOException {
  String outputName = "./info_logs_dir";
  try {
    JavaRDD<String> infoLogsRDD = CONTEXT.textFile(LOGS_FILE)
      .filter(log -> log.contains("INFO"));

    // The output is a directory containing RDD
    // It's why at the begin we remove whole directory
    infoLogsRDD.saveAsTextFile(outputName);

    // Read the file
    assertThat(new File(outputName).exists()).isTrue();
    List<String> infoLogs = Files.readAllLines(Paths.get(outputName + "/part-00000"));
    assertThat(infoLogs).hasSize(4);
    infoLogs.forEach(log -> assertThat(log).contains("INFO"));
  } finally {
    FileHelper.cleanDir(outputName);
  }
}

@Test
public void should_correctly_create_rdd_with_manually_defined_partitions() {
  List<Boolean> boolsList = Lists.newArrayList(true, false, true, true, true, false);

  // Explicitly, we want to store booleans in 3 partitions
  JavaRDD<Boolean> boolsRdd = CONTEXT.parallelize(boolsList, 3);

  // A partition is identified by an index
  List<Partition> partitions = boolsRdd.partitions();
  assertThat(partitions).hasSize(3);
  // In additional, when toDebugString() is called, the first number "(n)" corresponds
  // to the number of partitions
  System.out.println("> "+boolsRdd.toDebugString());
}

@Test
public void should_correctly_cache_rdd() {
  JavaSparkContext closeableContext = new JavaSparkContext(CONFIGURATION);
  List<String> textList = Lists.newArrayList("Txt1", "Txt2", "2", "3", "Txt3", "xt4", "Txt4", "Txt5", "Txt6");

  JavaRDD<String> textRdd = closeableContext.parallelize(textList);

  assertThat(textRdd.getStorageLevel().useMemory()).isFalse();
  System.out.println("> "+textRdd.toDebugString());

  // Caching is based on StorageLevel kept inside each RDD. At the beginning,
  // this level is equal to NONE. Simply it means that RDD is not persisted to
  // cache. In the other side, when cache() (or persist()) method is called on
  // RDD, StorageLevel changes to the configured one. Here, we use default
  // level which stores data only in memory. In the reason why we made a test on
  // useMemory()
  textRdd.cache();
  assertThat(textRdd.getStorageLevel().useMemory()).isTrue();

  // However, data is stored in cache related strictly to given SparkContext
  // So if SparkContext dies, cache is naturally lost
  // SparkContext holds, as TimeStampedWeakValueHashMap, all persistent RDDs.
  closeableContext.close();
  try {
    textRdd.count();
    fail("Should throw an exception when SparkContext is closed, even if RDD is cahced");
  } catch (IllegalStateException ise) {
    // "SparkContext has been shutdown" shows not only that context is not running
    // but also that all operations are passed from RDD to SparkContext
    assertThat(ise.getMessage()).isEqualTo("SparkContext has been shutdown");
  }
}

@Test
public void should_correctly_create_empty_rdd() {
  // as an oddity, we can also create an empty RDD
  JavaRDD<String> emptyRdd = CONTEXT.emptyRDD();

  assertThat(emptyRdd.collect()).isEmpty();
}

@Test
public void should_persist_rdd_into_memory() {
  JavaRDD<String> infoLogsRDD = CONTEXT.textFile(LOGS_FILE);
  // Similar call could be done with .cache() method
  // It makes a RDD persistent with the default storage level (MEMORY_ONLY)
  infoLogsRDD.persist(StorageLevel.MEMORY_ONLY());
  assertThat(infoLogsRDD.toDebugString()).contains("Memory Deserialized");

  infoLogsRDD.unpersist(true);
  assertThat(infoLogsRDD.toDebugString()).doesNotContain("Memory Deserialized");
}

@Test
public void should_persist_rdd_into_file() throws InterruptedException {
  JavaRDD<String> warnLogsRDD = CONTEXT.textFile(LOGS_FILE);
  // RDDs are persisted in lazy fashion
  // If we call persist() before applying any operation on RDD,
  // the persistance won't be done
  warnLogsRDD.persist(StorageLevel.DISK_ONLY());
  warnLogsRDD.filter(log -> log.contains("WARN"));
  // RDD is persisted automatically on the directory
  // specified in spark.local.dir property (/tmp is the default value)
  // When an RDD is serialized on disk, its debug info should
  // contain "Disk Serialized"
  // If you take a look at RDD.toDebugString method, you'll
  // see that of there are no persistence defined, the output doesn't contain
  // any info about it:
  // val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else ""
  assertThat(warnLogsRDD.toDebugString()).contains("Disk Serialized");
} 

The article covers a very important point for Spark applications - RDD. The first part shows, through several characteristic points, that RDD is an immutable, fault-tolerant and distributed dataset. In additional, it's partitioned to make parallel processing easier. The second part describes how to create RDDs and presents some of basic types. The last part shows some RDD manipulation situations through tests.

Share on: