Memory management in Spark

Versions: Spark 2.0.0

Memory management in Spark went through some changes. In the first versions, the allocation had a fix size. Only the 1.6 release changed it to more dynamic behavior. This change will be the main topic of the post.

This post describes memory use in Spark. The first part explains how it's divided among different application parts. The second one describes formulas used to compute memory for each part. The last part shows quickly how Spark estimates the size of objects.

Memory use in Spark

The memory used by Spark can be specified either in spark.driver.memory property or as a --driver-memory parameter for scripts. Similarly we can define executors memory (as spark.executor.memory property or --executor-memory argument). At this moment, we should note 2 things. The first is that setting spark.driver.memory during the construction of SparkConf object is useless - JVM is already running when this set is made. A solution could be the use of --driver-memory flag instead. The second thing is that for local mode Spark's executor is located within the driver, so setting memory amount for executor won't have any effect.

Defined memory is not fully reserved to Spark application. In fact, a part of it is reserved to user data structures, Spark internal metadata and a protection against unpredictable Out-Of-Memory errors. This segment is often called user memory. The rest of space is shared between 2 process: execution and storage and can be identified as Spark memory. Execution process represents all transformations and actions made by Spark. The storage concerns caching and moving data internally across cluster. Remaining heap (300MB in Spark 2.0) is reserved by the system but it doesn't participate in Spark application.

The interesting thing is that since Spark 1.6 execution and storage share the same memory space. Previously a fixed amount of memory was specified for both of them. Sometimes it was leading to space wasting, especially when RDDs weren't cached (= small storage charge) and execution needed additional memory. Spark 1.6 changed this approach and allowed both activities to exist in the same space. Thanks to that execution can evict cached records and use freed memory for its purposes. The inverse is not possible.

Memory size

However, there exists a threshold representing memory amount reserved to storage:

To discover how this threshold is specified, we should begin by discovering general formula used to compute available memory. It looks like below:

JVM_total - total size of heap assigned to Spark
Reserved_memory - memory reserved to Spark system (hardcoded 300MB)
Fraction - fraction of memory which will be shared, defined in 
           spark.memory.fraction property

# shared memory between execution and storage
Shared_memory = (JVM_total - System_memory) * Fraction 

If we know Shared_memory, we can compute the space reserved for storage activity, which is calculated as follows:

Storage_fraction - fraction of shared memory reserved to 
           storage. It's defined in spark.memory.storageFraction
           property.

Storage_memory = Shared_memory * Storage_fraction

If Storage_memory is too big, it can force execution to spill tasks to disk. Evidently, it can slow down performances. You can read more on splitting data in the post about shuffling in Spark.

Computing memory size in Spark

One of methods used to determinate memory use of Spark objects is the use of SizeEstimator object. It's internally used by Spark's SizeTracker to determine the size of objects. It can be detected when some memory-related errors occur. When it happens, logs similar to the below will appear:

java.lang.OutOfMemoryError: Java heap space at java.util.IdentityHashMap.resize(IdentityHashMap.java:469) 
  at java.util.IdentityHashMap.put(IdentityHashMap.java:445) 
  at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:159) 
  at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:229) 
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:194) 
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186) 
  at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54) 
  at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)  
  at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) 

Below you can find some simple code showing how to use estimator:

@Test
public void should_correctly_estimate_long_size() {
  Long age = new Long(30);

  assertThat(SizeEstimator.estimate(age)).isEqualTo(24L);
}

@Test
public void should_estimate_string_object_size() {
  String letter = "a";

  assertThat(SizeEstimator.estimate(letter)).isEqualTo(48L);
}

@Test
public void should_correctly_estimate_bean_size() {
  Person person = new Person("a", 30L);

  assertThat(SizeEstimator.estimate(person)).isEqualTo(72L);
}

private static class Person {
  private long age;
  private String letter;

  private Person(String letter, long age) {
    this.letter = letter;
    this.age = age;
  } 
}

The first part of this post shows how memory is divided in Spark applications. We can see that a part of heap (heap - 300) is shared by Spark code and user code. The rest (300MB) is reserved to system. Spark code amount is also shared, this time by execution and storage operations. We can reserve a fixed amount of memory for storage. Remaining memory will be commonly used by execution and storage. The second part shows the formula used to determinate memory division. The last part shows through some simple tests which objects is used by Spark to estimate objects size.