Cache in Spark

on waitingforcode.com

Cache in Spark

Cache is an appreciable tool when we have a greedy computation generating a lot of data. Spark also uses this feature to better handle the case of RDD which generation is heavy (for example necessities database connection or data retrieval from external web services).

This post focuses on cache in Spark. The first part describes some general information about caching. After that more detailed topic is started. It concerns types of cache available in Spark. The last part shows how to use cache through tests on Spark's Java API.

Persisted RDDs

Caching is similar to checkpointing discovered in the post about checkpoint in Spark. The goal of them is to avoid the repetition of expensive computations of RDDs reused multiple times in the code. It can occur especially in some of Machine Learning algorithms because of their iterative nature. The main difference with checkpoint concerns persisted data. Unlike checkpoint, cache stores targeted RDD with all its dependencies. So it's fault-tolerant and can be used to recompute partitions in the case of executor failures.

Since cache is stored by executor (note that it can be also stored outside of it, see next part), it's strictly related to Spark application lifecycle. When the program stops, the cache is automatically removed. It's the second difference with checkpoint which can remain available even for new Spark programs.

When cache is stored only on memory, it has a certain amount of reserved space. When this space is full and we want to cache more RDDs, the oldest ones are evicted according to LRU (Least Recently Used) policy.

Amount of memory available for cache is determined either dynamically or staticaly in configuration. Internally Spark uses heap for 2 things: code execution and storage (so caching). When there is more storage than computation, storage can take more place (and inversely). But code execution is more important than storage and if it misses some place it can take it back from the space taken by storage. The amount of memory exclusively reserved for storage can be defined through spark.memory.storageFraction property. The cache will be never evicted from the space delimited by this value - even if execution code needs some memory.

Cache types

Spark's cache is manipulated by persist(StorageLevel) method. As you can see, it takes one parameter representing storage method. They're enumerated in StorageLevel object. The places where cache can be stored are:

  • disk (DISK_ONLY, DISK_ONLY_2) - data is stored only on disk. Note that _2 for the second option means that cached partitions are replicated on different cluster nodes.
  • memory (MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_ONLY_SER, MEMORY_ONLY_SER_2) - SER flag means that only serialized Java objects are cached. Cache generation is slower because of added serialization step. But in the other side, Garbage Collector can do its job quicker because data is cached as a single serialized buffer and not multiple different objects.
  • disk and memory (MEMORY_AND_DISK, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER, MEMORY_AND_DISK_SER_2)
  • off-heap (OFF_HEAP) - special case, still experimental (Spark 2.0). RDDs will be cached in memory store as for example Tachyon. Thanks to that different executors can share data. It has several advantages: cache can be shared, garbage collection impact is reduced and cached data is not lost on executors crash. To make it work, aside of available in-memory store, several configuration entries must be turned on: spark.memory.offHeap.enabled and spark.memory.offHeap.size.

But these levels aren't exclusive. Custom levels can be created through StorageLevel.apply(...) factory method. As the parameters are specified the same points as for already existent levels: disk use, memory use, serialization use, off-heap support and replication factor.

Spark cache examples

After discovering some important points about Spark cache, it's a good moment to see them working in real code:

private JavaRDD<Integer> testedNumersRDD =
  CONTEXT.parallelize(IntStream.rangeClosed(1, 5).boxed().collect(Collectors.toList()), 2);

@Test
public void should_correctly_cache_simple_rdd_on_disk() {
  JavaRDD<Integer> filteredRDD = testedNumersRDD.filter(number -> number > 50);

  filteredRDD.persist(StorageLevel.DISK_ONLY());

  filteredRDD.count();

  // To check if given RDD was cached, we can search CachedPartitions in debug text
  // Information about cache method (disk/memory only or both, replicated or not etc)
  // can be found in '[]', at the end of each line, as for example here:
  // MapPartitionsRDD[1] at filter at CacheTest.java:26 [Disk Serialized 1x Replicated]
  assertThat(filteredRDD.toDebugString())
    .contains("CachedPartitions: 2; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 8.0 B",
              "[Disk Serialized 1x Replicated]");
}

@Test
public void should_unpersist_rdd_cached_on_disk() {
  JavaRDD<Integer> filteredRDD = testedNumersRDD.filter(number -> number > 50);

  filteredRDD.persist(StorageLevel.DISK_ONLY());

  filteredRDD.count();

  // To check if given RDD was cached, we can search CachedPartitions in debug text
  // Information about cache method (disk/memory only or both, replicated or not etc)
  // can be found in '[]', at the end of each line, as for example here:
  // MapPartitionsRDD[1] at filter at CacheTest.java:26 [Disk Serialized 1x Replicated]
  assertThat(filteredRDD.toDebugString()).contains("CachedPartitions: 2; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 8.0 B",
          "[Disk Serialized 1x Replicated]");

  // When RDD is evicted, we can see that in logs, more particularly by
  // looking for following entries:
  // <pre>
  // INFO Removing RDD 1 from persistence list (org.apache.spark.rdd.MapPartitionsRDD:54)
  // DEBUG removing RDD 1 (org.apache.spark.storage.BlockManagerSlaveEndpoint:58)
  // </pre>
  // This is a blocking operation. It stops thread until all blocks
  // are removed from cache.
  filteredRDD.unpersist();

  assertThat(filteredRDD.toDebugString())
    .doesNotContain("CachedPartitions: 2; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 8.0 B")
    .doesNotContain("[Disk Serialized 1x Replicated]");
}

@Test
public void should_correctly_cache_simple_rdd_on_disk_with_replicates() {
  JavaRDD<Integer> filteredRDD = testedNumersRDD.filter(number -> number > 50);

  filteredRDD.persist(StorageLevel.DISK_ONLY_2());

  filteredRDD.count();

  // To check if given RDD was cached, we can search CachedPartitions in debug text
  // Information about cache method (disk/memory only or both, replicated or not etc)
  // can be found in '[]', at the end of each line, as for example here:
  // MapPartitionsRDD[1] at filter at CacheTest.java:26 [Disk Serialized 1x Replicated]
  assertThat(filteredRDD.toDebugString())
    .contains("CachedPartitions: 2; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 8.0 B",
              "[Disk Serialized 1x Replicated]");
}

@Test
public void should_correctly_cache_rdd_with_custom_storage_level() {
  JavaRDD<Integer> filteredRDD = testedNumersRDD.filter(number -> number > 50);

  // We try to create a storage level for deserializable and disk
  // which doesn't exist (partitions are serialized on disk)
  boolean useDisk = true, useMemory = false, deserialized = true, useOffHeap = false;
  int replicationFactor = 1;
  filteredRDD.persist(StorageLevel.apply(useDisk, useMemory, useOffHeap, deserialized, replicationFactor));

  filteredRDD.collect();

  // To check if given RDD was cached, we can search CachedPartitions in debug text
  // Information about cache method (disk/memory only or both, replicated or not etc)
  // can be found in '[]', at the end of each line, as for example here:
  // MapPartitionsRDD[1] at filter at CacheTest.java:26 [Disk Serialized 1x Replicated]
  assertThat(filteredRDD.toDebugString())
    .contains("CachedPartitions: 2; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 8.0 B",
              "[Disk Deserialized 1x Replicated]");
}

@Test
public void should_correctly_cache_simple_rdd_on_memory_only() {
  JavaRDD<Integer> filteredRDD = testedNumersRDD.filter(number -> number > 50);

  // cache() corresponds to the use of persist() which
  // calls persist(StorageLevel.MEMORY_ONLY)
  filteredRDD.cache();

  // action to execute transformations
  filteredRDD.count();

  // To check if given RDD was cached, we can search CachedPartitions in debug text
  // Information about cache method (disk/memory only or both, replicated or not etc)
  // can be found in '[]', at the end of each line, as for example here:
  // ParallelCollectionRDD[0] at parallelize at CacheTest.java:19 [Memory Deserialized 1x Replicated]
  assertThat(filteredRDD.toDebugString())
    .contains("CachedPartitions: 2; MemorySize: 32.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B",
              "[Memory Deserialized 1x Replicated]"); 
}

This article shows cache feature in Spark. The first part talks about cache and the differences with checkpointing. The second part explains where cache can be stored. The last part shows cache manipulation through Spark Java API.

If you liked it, you should read: isEmpty() trap in Spark

Share on: