Apache Spark and off-heap memory

Versions: Apache Spark 2.4.0

With data-intensive applications as the streaming ones, bad memory management can add long pauses for GC. Luckily, we can reduce this impact by writing memory-optimized code and using the storage outside the heap called off-heap.

In this post we'll focus on the off-heap memory in Apache Spark. The first one shows where the off-heap memory is used in Apache Spark. The second one focuses on Project Tungsten and its revolutionary row-based format. The next part explains some internal details about the off-heap memory management while the last shows a test made on the standalone YARN cluster.

Use cases in Apache Spark

Off-heap storage is not managed by the JVM's Garbage Collector mechanism. Hence, it must be handled explicitly by the application. Another difference with on-heap space consists of the storage format. In on-heap, the objects are serialized/deserialized automatically by the JVM but in off-heap, the application must handle this operation. In such a case the data must be converted to an array of bytes. If you want to know a little bit more about that topic, you can read the On-heap vs off-heap storage post.

Off-heap memory is used in Apache Spark for the storage and for the execution data. The former use concerns caching. The persist method accepts a parameter being an instance of StorageLevel class. Its constructor takes a parameter _useOffHeap defining whether the data will be stored off-heap or not. To test off-heap caching quickly we can use already defined StorageLevel.OFF_HEAP:

  "a RDD" should "not be cached in off-heap memory because of misconfiguration" in {
    val conf = new SparkConf().setAppName("[OFFH] Cache").setMaster("local[4]")
    val sparkContext = SparkContext.getOrCreate(conf)
    val rddOfNumbers = sparkContext.parallelize(Seq(1, 2, 3, 4, 5))
    val logAppender = InMemoryLogAppender.createLogAppender(Seq("offheap, 1 replicas"))

    rddOfNumbers.persist(StorageLevel.OFF_HEAP)

    rddOfNumbers.count()

    logAppender.getMessagesText() should have size 4
    logAppender.getMessagesText() should contain allOf("Level for block rdd_0_0 is StorageLevel(disk, memory, offheap, 1 replicas)",
      "Level for block rdd_0_1 is StorageLevel(disk, memory, offheap, 1 replicas)",
      "Level for block rdd_0_2 is StorageLevel(disk, memory, offheap, 1 replicas)",
      "Level for block rdd_0_3 is StorageLevel(disk, memory, offheap, 1 replicas)")
  }

Internally the engine uses the def useOffHeap: Boolean = _useOffHeap method to detect the type of storage memory. However, the above snippet won't cache the data in off-heap memory. If we look carefully, in the logs we can find the entries like:

INFO Will not store rdd_0_1 as the required space (1048576 bytes) exceeds our memory limit (0 bytes) (org.apache.spark.memory.UnifiedMemoryManager:54)
TRACE Checking for hosts with no recent heartbeats in HeartbeatReceiver. (org.apache.spark.HeartbeatReceiver:62)
INFO Will not store rdd_0_3 as the required space (1048576 bytes) exceeds our memory limit (0 bytes) (org.apache.spark.memory.UnifiedMemoryManager:54)
WARN Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_0_1 in memory. (org.apache.spark.storage.memory.MemoryStore:66)
WARN Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_0_3 in memory. (org.apache.spark.storage.memory.MemoryStore:66)
INFO Will not store rdd_0_2 as the required space (1048576 bytes) exceeds our memory limit (0 bytes) (org.apache.spark.memory.UnifiedMemoryManager:54)
WARN Not enough space to cache rdd_0_1 in memory! (computed 0.0 B so far) (org.apache.spark.storage.memory.MemoryStore:66)
WARN Not enough space to cache rdd_0_3 in memory! (computed 0.0 B so far) (org.apache.spark.storage.memory.MemoryStore:66)
WARN Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_0_2 in memory. (org.apache.spark.storage.memory.MemoryStore:66)
WARN Not enough space to cache rdd_0_2 in memory! (computed 0.0 B so far) (org.apache.spark.storage.memory.MemoryStore:66)
INFO Will not store rdd_0_0 as the required space (1048576 bytes) exceeds our memory limit (0 bytes) (org.apache.spark.memory.UnifiedMemoryManager:54)
WARN Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_0_0 in memory. (org.apache.spark.storage.memory.MemoryStore:66)
WARN Not enough space to cache rdd_0_0 in memory! (computed 0.0 B so far) (org.apache.spark.storage.memory.MemoryStore:66)
INFO Memory use = 2.2 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 2.2 KB. Storage limit = 1981.2 MB. (org.apache.spark.storage.memory.MemoryStore:54)
INFO Memory use = 2.2 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 2.2 KB. Storage limit = 1981.2 MB. (org.apache.spark.storage.memory.MemoryStore:54)
INFO Memory use = 2.2 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 2.2 KB. Storage limit = 1981.2 MB. (org.apache.spark.storage.memory.MemoryStore:54)
INFO Memory use = 2.2 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 2.2 KB. Storage limit = 1981.2 MB. (org.apache.spark.storage.memory.MemoryStore:54)
WARN Persisting block rdd_0_2 to disk instead. (org.apache.spark.storage.BlockManager:66)

As you can see, the cache were stored directly on disk. It's because we didn't define the amount of off-heap memory available for our application. In order to make it work we need to explicitly enable off-heap storage with spark.memory.offHeap.enabled and also specify the amount of off-heap memory in spark.memory.offHeap.size. After doing that we can launch the following test:

  "a RDD" should "be cached in off-heap memory when 2 required configuration entries are defined" in {
    val conf = new SparkConf().setAppName("[OFFH] Cache").setMaster("local[4]")
      .set("spark.memory.offHeap.enabled", "true").set("spark.memory.offHeap.size", "3048576")
    val sparkContext = SparkContext.getOrCreate(conf)
    val rddOfNumbers = sparkContext.parallelize(Seq(1, 2, 3, 4, 5))
    val logAppender = InMemoryLogAppender.createLogAppender(Seq("offheap, 1 replicas", "Will not store rdd"))

    rddOfNumbers.persist(StorageLevel.OFF_HEAP)

    rddOfNumbers.count()

    logAppender.getMessagesText() should have size 4
    logAppender.getMessagesText() should contain allOf("Level for block rdd_0_0 is StorageLevel(disk, memory, offheap, 1 replicas)",
      "Level for block rdd_0_1 is StorageLevel(disk, memory, offheap, 1 replicas)",
      "Level for block rdd_0_2 is StorageLevel(disk, memory, offheap, 1 replicas)",
      "Level for block rdd_0_3 is StorageLevel(disk, memory, offheap, 1 replicas)")
  }

When a RDD is cached in off-heap memory, the transformation from object into array of bytes is delegated to BlockManager and its putIteratorAsBytes[T](blockId: BlockId, values: Iterator[T], classTag: ClassTag[T], memoryMode: MemoryMode) method. The translation process is made by SerializedValuesHolder which resolves the allocator from memory mode in that way:

val allocator = memoryMode match {
    case MemoryMode.ON_HEAP => ByteBuffer.allocate _
    case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
  }

Another use case is execution memory. A task may need some memory from the execution pool in order to store intermediate results. For example, the following snippet tries to use RowBasedKeyValueBatch to prepare data for aggregation:

  "too small available off-heap memory" should "fail the task" in {
    val memory1gb = "1000000000"
    val TestSparkSession = SparkSession.builder().appName("[OFFH] Execution memory in Dataset").master("local[*]")
      .config("spark.memory.offHeap.enabled", "true")
      .config("spark.memory.offHeap.size", memory1gb)
      .getOrCreate()
    val logAppender = InMemoryLogAppender.createLogAppender(Seq("VariableLengthRowBasedKeyValueBatch"))
    import TestSparkSession.implicits._
    val TestedDataSet = Seq(
      ("pl", "Scala", "Team1", 10), ("pl", "Java", "Team1", 1), ("pl", "C++", "Team2", 2),
      ("us", "Scala", "Team2",15), ("us", "Java", "Team2",3),
      ("fr", "Scala", "Team2",5), ("fr", "Java", "Team2",9)
    ).toDF("country", "language", "team", "projects_number")

    TestedDataSet.rollup($"country", $"language", $"team").sum("projects_number").collect()

    val executionReservedMemory = logAppender.getMessagesText()
      .filter(message => message.contains("Task") && message.contains("acquired") &&
        message.contains("for org.apache.spark.sql.catalyst.expressions.VariableLengthRowBasedKeyValueBatch"))
    executionReservedMemory should not be empty
    val executionReleasedMemory = logAppender.getMessagesText()
      .filter(message => message.contains("Task") && message.contains("release") &&
        message.contains("from org.apache.spark.sql.catalyst.expressions.VariableLengthRowBasedKeyValueBatch"))
    executionReleasedMemory should not be empty
  }

However defining the use of off-heap memory explicitly doesn't mean that Apache Spark will use only it. The framework also reserves the on-heap memory. In the previous examples, we can observe the use of on-heap memory for the closures defining the processing logic. In such a case, and at least for local mode (cluster mode will be detailed in the last part), the amount of on-heap memory is computed directly from runtime memory, as:

  private def getMaxMemory(conf: SparkConf): Long = {
    val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    val reservedMemory = conf.getLong("spark.testing.reservedMemory",
      if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
    val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
    val usableMemory = systemMemory - reservedMemory
    val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
    (usableMemory * memoryFraction).toLong
  }

    val maxMemory = getMaxMemory(conf)
    new UnifiedMemoryManager(
      conf,
      maxHeapMemory = maxMemory,
      onHeapStorageRegionSize =
        (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
      numCores = numCores)

The reasons to use off-heap memory rather than on-heap are the same as in all JVM-based applications. It helps to reduce GC overhead, to share some data among 2 different processes, to have always ready-to-use cache data (even after tasks restart). However, it doesn't come without costs. In the flip side, the off-heap increases CPU usage because of the extra translation from bytes of arrays into expected JVM object. The off-heap has also a trap. Even though we manage to store JVM objects off-heap, when they're read back to be used in the program, they can be allocated on-heap. Thus, there will be the need to garbage collect them. Therefore, in the Apache Spark context, in my opinion, it makes sense to use off-heap for SQL or Structured Streaming because they don't need to serialize back the data from the bytes array. The use in RDD-based programs can be useful though but should be studied with a little bit more care. Nonetheless, please notice that the Project Tungsten's format was designed to be efficient on on-heap memory too. Hence to decide whether go to on-heap or off-heap, we should always make the benchmark and use the most optimal solution only when the difference is big between them. Otherwise, it's always good to keep things simple and make them more complicated only when some important performance problems appear.

Off-heap memory and Project Tungsten

First and foremost, for me the most of confusion about off-heap and on-heap memory was introduced with Project Tungsten revolutionary storage format. Dataset stores the data not as Java or Kryo-serialized objects but as the arrays of bytes. Since this storage is intuitively related to the off-heap memory, we could suppose that it natively uses off-heap. But it's not true. Modules based on Project Tungsten, therefore Apache Spark SQL and Apache Spark Structured Streaming, will use off-heap memory only and only when it's explicitly enabled and when it's supported by the executor's JVM. The array-based storage format can help to reduce GC overhead though and it's even on the on-heap because there is rarely a need to serialize it back from compact array binary format.

The logic of activating off-heap is defined in MemoryManager class:

final val tungstenMemoryMode: MemoryMode = {
if (conf.get(MEMORY_OFFHEAP_ENABLED)) {
  require(conf.get(MEMORY_OFFHEAP_SIZE) > 0,
    "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
  require(Platform.unaligned(),
    "No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
  MemoryMode.OFF_HEAP
} else {
  MemoryMode.ON_HEAP
}
}

So resolved mode determines the way of allocating the memory by HeapMemoryAllocator (on-heap) or UnsafeMemoryAllocator (off-heap).

MemoryManager - off-heap management details

To see the use of the off-heap memory we can go directly to one of the MemoryManager implementations: StaticMemoryManager or UnifiedMemoryManager. The former one is a legacy memory manager and it doesn't support off-heap. It materializes that by setting the size of the off-heap memory pools to 0. On the other side, UnifiedMemoryManager is able to handle off-heap storage.

By the way, MemoryManager shows for what we can use off-heap. The class has 4 memory pools fields. They represent the memory pools for storage use (on-heap and off-heap )and execution use (on-heap and off-heap). The amount of off-heap storage memory is computed as maxOffHeapMemory * spark.memory.storageFraction. The remaining value is reserved for the "execution" memory. The execution memory means the storage of tasks files as for instance the ones coming from shuffle operation.

The allocation of the memory is handled by UnsafeMemoryAllocator instance ands its allocate(long size) method. Under-the-hood it manipulates off-heap memory with the help of sun.misc.Unsafe class. The same allocator handles deallocation and it uses the free(MemoryBlock memory) method for that. UnsafeMemoryAllocator is invoked by TaskMemoryManager's allocatePage(long size, MemoryConsumer consumer) method.

Off-heap in cluster mode

This post is another one inspired by a discussion in my Github. It pointed out an interesting question about the off-heap behavior in the cluster mode. The question was about defining together executor memory property and off-heap:

spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --driver-memory 1g --executor-memory 1g --conf "spark.eventLog.dir=hdfs://172.18.0.20:9000/shared/spark-logs" --conf "spark.eventLog.enabled=true"  --conf "spark.memory.offHeap.enabled=true" --conf "spark.memory.offHeap.size=3G" --num-executors 1  --executor-cores 1 ~/spark-2.1.0-bin-hadoop2.7/examples/jars/spark-examples*.jar 100000 

To get the answer and confirm my initial supposition, I made some research and I found a good hint in a Yoshiyasu Saeki presentation on slideshare. In the slide 14 we can clearly see what happens when we define both memory properties. According to the slide in such case the resource manager will allocate the amount of on-heap memory defined in executor-memory property and won't be aware of the off-heap memory defined in the Spark configuration.

But since I don't understand Japanese I wanted to confirm my deduction by making a small test on my spark-docker-yarn Docker image:

bartosz:~/projects/spark-docker$ docker-compose up   --scale slave=2
Starting spark-docker_master_1 ... done
Starting spark-docker_slave_1  ... done
Starting spark-docker_slave_2  ... done
Attaching to spark-docker_master_1, spark-docker_slave_1, spark-docker_slave_2

The tests consisted on executing spark-submit commands and observing the impact on the memory during the jobs execution. The following screencast shows the results of that experience:

As you can see the amount of memory in YARN UI was the same for both tested scenarios. However, it was different for each Spark application. And it's quite logical because executor-memory brings the information about the amount of memory that the resource manager should allocate to each Spark's executor. But it's unaware of the strictly Spark-application related property with off-heap that makes that our executor uses: executor memory + off-heap memory + overhead.

Asking resource allocator less memory than we really need in the application (executor-memory < off-heap memory) is dangerous. In such a situation, the resource manager is unaware of the whole memory consumption and it can mistakenly run new applications even though there is no physical memory available.

Off-heap memory is a great way to reduce GC pauses because it's not in the GC's scope. However, it brings an overhead of serialization and deserialization. The latter in its turn makes that the off-heap data can be sometimes put onto heap memory and hence be exposed to GC. Also, the new data format brought by Project Tungsten (array of bytes) helps to reduce the GC overhead. These 2 reasons make that the use of off-heap memory in Apache Spark applications should be carefully planned and, especially, tested. As we saw in the last part's tests, having off-heap memory defined to make the tasks submit process more difficult. Moreover, resource managers aren't aware of the app-specific configuration and in the case of misconfiguration, it can lead to OOM problems difficult to debug. To sum up, as every optimization, the off-heap use must be tested and compared against the same pipeline executed on-heap. If there is no a big difference, it's better to keep things simple (KISS principle) and stay with on-heap memory.