Memory and Apache Spark classes

Versions: Apache Spark 2.4.0

In previous posts about memory in Apache Spark, I've been exploring memory behavior of Apache Spark when the input files are much bigger than the allocated memory. After that it's a good moment to sum up that in the post dedicated to classes involved in memory using tasks.

This post is composed of 2 sections. The first one shows classes involved in memory consumption in Apache Spark programs. The second one shows a short video illustrating described classes impact on memory.

The first of important memory-related classes are the implementations of org.apache.spark.memory.MemoryManager: StaticMemoryManager and UnifiedMemoryManager. Both represent static and dynamic memory management strategies. The former one statically divides on-heap storage among storage and execution regions. Each of them is independent and cannot borrow some space from the other. In the other side, UnifiedMemoryManager is the opposite - the boundary between storage and execution is soft. It means that, when needed, one can borrow some space from the other. Both managers store the information about the memory use in memory pools: ExecutionMemoryPool and StorageMemoryPool. Both persist the information about memory use in simple data structures: HashMap for execution and Long field for storage pool.

Every time when a task needs to acquire some execution memory it reclaims that via the intermediary of org.apache.spark.memory.TaskMemoryManager and its acquireExecutionMemory method. This function is invoked by one of implementations of org.apache.spark.memory.MemoryConsumer abstract, such as: LongToUnsafeRowMap, ShuffleExternalSorter,ExternalSorter or ExternalAppendOnlyMap. They're needed principaly when the data must be aggregated or shuffled before the next step in the processing.

Physically the processing data is stored as blocks in org.apache.spark.storage.memory.MemoryStore. The blocks are represented there either as arrays of serialized Java objects or as serialized ByteBuffers stored in local LinkedHashMap. Entries are managed by BlockManager (you can find more about blocks in Apache Spark blocks explained post). MemoryStore also handles the concept of unroll memory which is a space reserved to deserialize block data into memory. One of its examples we can observe in MemoryStore concerns iterator values whose values are "unrolled" one by one - as long as there is enough unroll space. If all values are correctly read, then the block is full stored in memory and the memory it took during unrolling is transferred to storage space. Otherwise only its part is persisted.

In all described classes we can find a reference to 2 kinds of memory: on-heap and off-heap. The available memory in MemoryStore is expressed as a sum of both. Also MemoryManager has off-heap and on-heap pools for storage and execution requirements. How the engine decides about the memory type to use ? It's not the MemoryManager who takes that decision but its caller: MemoryStore. And MemoryStore gets the instructions from BlockManager which in its turn gets the information from block's org.apache.spark.storage.StorageLevel.

Among Apache Spark SQL classes participating in memory use we can find org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder. It manages data buffer for a DataFrame's row. The buffer is filled up by different unsafe row writers. In SQL category there are also CacheManager responsible for caching query results. It's directly invoked at Dataset's cache or persist methods calls.

Memory use visualization

The list of memory impacting classes is now known. But you don't have to take my word. The following 2 test cases with connectedJava VisualVM on them and decorated class activity will prove that the classes from previous section really impact the memory. Let's begin with RDD-based test:

package object memoryclasses {

  private[memoryclasses] val BasePath = "/tmp/memory_classes"

  private def fileName(fileIndex: Int): String = s"${BasePath}/file_${fileIndex}.json"

  private[memoryclasses] def createTestFilesIfMissing(linesPerFile: Int, files: Int): Unit = {
    for (fileIndex <- 1 to files) {
      val fileToWrite = new File(fileName(fileIndex))
      if (!fileToWrite.exists) {
        val testDataset = (1 to linesPerFile).map(nr => s"""{"id": ${nr}, "labels": {"long":"Number ${nr}/${fileIndex}", "short": "#${nr}/${fileIndex}"}, "creation_time_ms": ${System.currentTimeMillis()}}""")
          .mkString("\n")
        FileUtils.writeStringToFile(fileToWrite, testDataset)
      }
    }
  }

}

case class NumberContent(id: Int, label: NumberContentLabels, creationTimeMs: Long)
case class NumberContentLabels(long: String, short: String)

object RddPipeline {

  SparkMemoryClassesDecorators.decoratePuts().toClass
  SparkMemoryClassesDecorators.decorateTaskMemoryManager().toClass
  SparkMemoryClassesDecorators.decorateBufferHolder().toClass
  SparkMemoryClassesDecorators.decorateMemoryConsumer().toClass
  SparkMemoryClassesDecorators.decorateMemoryStore().toClass
  SparkMemoryClassesDecorators.decorateUnsafeRow.toClass
  SparkMemoryClassesDecorators.decorateHadoopRDD.toClass
  SparkMemoryClassesDecorators.decorateExternalAppendOnlyMap.toClass
  SparkMemoryClassesDecorators.decorateAcquireStorageMemory(
    "org.apache.spark.memory.StaticMemoryManager", "acquireStorageMemory").toClass
  SparkMemoryClassesDecorators.decorateAcquireStorageMemory(
    "org.apache.spark.memory.UnifiedMemoryManager", "acquireStorageMemory").toClass

  createTestFilesIfMissing(100000, 3)

  def main(args: Array[String]): Unit = {
    println("Waiting 10 seconds before getting connection")
    Thread.sleep(10000L)

    val testSparkSession = SparkSession.builder().appName("RDD pipeline - memory classes").master("local[*]").getOrCreate()

    val jsonLineRdd = testSparkSession.sparkContext.textFile(BasePath)
    val mappedJsonToNumberContents = jsonLineRdd.map(json => {
      val jsonContent = JSON.parseFull(json)
      val jsonMap = jsonContent.get.asInstanceOf[Map[String, Any]]
      val labels = jsonMap("labels").asInstanceOf[Map[String, String]]
      NumberContent(jsonMap("id").asInstanceOf[Double].toInt,
        NumberContentLabels(labels("short"), labels("long")), jsonMap("creation_time_ms").asInstanceOf[Double].toLong)
    })

    val groupedByIdDigitsCount = mappedJsonToNumberContents.groupBy(numberContent => numberContent.id.toString.length)
    groupedByIdDigitsCount.sortBy {
      case (digitsCount, _) => digitsCount
    }.foreachPartition(partitionDataIterator => {
      partitionDataIterator.foreach {
        case (digitsCount, items) => {
          println(s"[APP] Got digitsCount=${digitsCount}")
        }
      }
    })

  }


}

In this video we can observe that HadoopRDD#compute method ("Computing partition..." message) is very memory-intensive.Only when it finishes the amount of memory is freed (2:39') and the message "Freeing task resources" appears. However not for long time because TaskMemoryManager's consumer (ExternalAppendOnlyMap) tries to acquire the memory in order to make the grouping operation. But it's only a short peak in memory use. Let's check whether we can observe the same things in the case of Datasets:

object DatasetPipeline {

  SparkMemoryClassesDecorators.decoratePuts().toClass
  SparkMemoryClassesDecorators.decorateTaskMemoryManager().toClass
  SparkMemoryClassesDecorators.decorateBufferHolder().toClass
  SparkMemoryClassesDecorators.decorateMemoryConsumer().toClass
  SparkMemoryClassesDecorators.decorateMemoryStore().toClass
  SparkMemoryClassesDecorators.decorateUnsafeRow.toClass
  SparkMemoryClassesDecorators.decorateHadoopRDD.toClass
  SparkMemoryClassesDecorators.decorateExternalAppendOnlyMap.toClass
  SparkMemoryClassesDecorators.decorateAcquireStorageMemory(
    "org.apache.spark.memory.StaticMemoryManager", "acquireStorageMemory").toClass
  SparkMemoryClassesDecorators.decorateAcquireStorageMemory(
    "org.apache.spark.memory.UnifiedMemoryManager", "acquireStorageMemory").toClass

  createTestFilesIfMissing(100000, 3)


  def main(args: Array[String]): Unit = {
    println("Waiting 10 seconds before getting connection")
    Thread.sleep(10000L)

    val testSparkSession = SparkSession.builder().appName("Dataset pipeline - memory classes").master("local[*]").getOrCreate()
    import testSparkSession.implicits._

    val newLineJsonData = testSparkSession.read.json(BasePath)
    val groupedByIdDigitsCount = newLineJsonData.groupByKey(row => row.getAs[Int]("id").toString.length)
    groupedByIdDigitsCount.count().foreachPartition(countersById => {
      countersById.foreach {
        case (id, count) => println(s"[APP] Got ${id} and ${count}")
      }
    })
  }


}

In the video we can see a very similar pattern to the previous one. Once again a big memory pressure comes from data preparation task. It's generated by UnsafeRow and BufferHolder which are 2 classes used during the data materialization. It's pretty high until 0:52. At this moment the memory use decreases with a message announcing a new peak because of the allocation tentative for FixedLengthRowBasedKeyValueBatch memory consumer. From this point the memory use increases and once again we can find plenty of messages showing UnsafeRow and BufferHolder participation in this process.

This post terminates a short series about Apache Spark and its memory. After showing what happens under-the-hood when the datasource is much bigger than the available memory, we ended here with the explanation of classes involved in the memory management. It can be another debugging possibility in case of memory problems with Apache Spark workloads.