Apache Spark and data bigger than the memory

on waitingforcode.com

Apache Spark and data bigger than the memory

Unlike Hadoop Map/Reduce, Apache Spark uses the power of memory to speed-up data processing. But does it mean that we can't process datasets bigger than the memory limits ? Below small survey will try to answer to that question.

This post tends to give some points about Apache Spark behavior when files to process and data to cache are bigger than the available memory. It focuses on low-level RDD abstraction and only one of next posts will try to do the same exercises for Dataset and streaming data sources. The first part of this post gives a general insight about memory use in Apache Spark. The second part focuses on one of its use cases - files data processing. The last section talks about cache feature and its impact on memory. The sections contain some examples showing Apache Spark behavior given some specific "size" conditions.

Apache Spark and memory

Capacity prevision is one of hardest task in data processing preparation. Very often we think that only dataset size does matter in this operation but it's not true. Every time there is an extra memory overhead because of the framework code and the data processing logic. Apache Spark is not an exception since it requires also some space to run the code and execute some other memory-impacting components as:

  • cache - if given data is reused in different places often it's worth caching it to avoid time consuming recomputation. The storage for cache is defined by the storage level (org.apache.spark.storage.StorageLevel). Among the native choices we can find: memory, disk or memory + disk. Each of these levels can either be serialized or serialized, and replicated or not. In this post we'll focus on memory and memory + disk storage.
  • broadcast variables - these objects are sent to executors only once. Executors keep them in the memory during the whole data processing operation.
  • tasks - also data processing logic needs memory to work. We can find it in the log entries similar to:
          DEBUG org.apache.spark.memory.TaskMemoryManager Task 6 acquired 5.4 MB for DEBUG org.apache.spark.util.collection.ExternalAppendOnlyMap@57fe7437
          # or
          INFO org.apache.spark.memory.TaskMemoryManager: Memory used in task 6317340
          INFO org.apache.spark.memory.TaskMemoryManager: Acquired by org.apache.spark.shuffle.sort.ShuffleExternalSorter@2c98b15b: 32.0 KB
          INFO org.apache.spark.memory.TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@566144b7: 64.0 KB
          INFO org.apache.spark.memory.TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@ea479ad: 13.3 GB
          

    TaskMemoryManager allocates the memory needed for the data used in the executed task. It does it for hash-based aggregations (see HashAggregateExec, TungstenAggregationIterator), sort-based shuffle (ShuffleExternalSorter) or other sorters (UnsafeExternalSorter). It's also widely used in the data structures related to the tasks. We can find them by looking for the implementations of MemoryConsumer abstract class.
  • processed data - obviously, the processed data is put into the memory accordingly to the defined partitioning. Hence if a partition is too big to be kept in memory it can lead to Out of memory problems.

It's also important to keep in mind that user-defined processing logic also impacts the memory. A pretty clear example for that is the code doing some extra caching of enrichment data.

Let's execute now a small example to see what happens in the log and hence observe which Apache Spark components use memory:

// To generate tested files, go to /src/test/resources/memory_impact
val OneGbInBytes = 1073741824L

def sequentialProcessingSession = SparkSession.builder()
  .appName("Spark memory types").master("local[1]")
  .getOrCreate()

object GeneralMemoryIllustration {

  def main(args: Array[String]): Unit = {
    val testDirectory = "/tmp/spark-memory-illustration"
    val fileName = "generated_file_500_mb.txt"
    Files.createDirectory(new File(testDirectory).toPath)
    Files.copy(getClass.getResourceAsStream(s"/memory_impact/${fileName}"),
      new File(s"${testDirectory}/${fileName}").toPath)
    val textRdd = sequentialProcessingSession.sparkContext.textFile(s"${testDirectory}/${fileName}")
    textRdd.persist(StorageLevel.MEMORY_ONLY)
    processTextRdd(textRdd)
  }

}

After the code execution we can find some memory use indicators in the logs:

INFO Added rdd_1_2 in memory on 192.168.0.11:39758 (size: 190.7 MB, free: 474.7 MB) (org.apache.spark.storage.BlockManagerInfo:54)
INFO Block rdd_1_3 stored as values in memory (estimated size 16.0 B, free 474.5 MB) (org.apache.spark.storage.memory.MemoryStore:54)
INFO Block broadcast_2 stored as values in memory (estimated size 6.5 KB, free 283.8 MB) (org.apache.spark.storage.memory.MemoryStore:54)
DEBUG Task 15 release 0.0 B from org.apache.spark.util.collection.ExternalAppendOnlyMap@db18c1e (org.
apache.spark.memory.TaskMemoryManager:231)

If you analyze the logs carefully you can notice the presence of "as values" blocks storage. In fact block manager - Apache Spark's component responsible for blocks storage - is able to persist the blocks either as values or as bytes. The former format stores the data as a contiguous array of bytes. The latter one stores the object in multiple chunks.

Apache Spark, memory and data processing

Let's try to figure out what happens with the application when the source file is much bigger than the available memory. The memory in the below tests is limited to 900MB by -Xms900m -Xmx900m options. Naively we could think that a file bigger than available memory will fail the processing with OOM memory error. And this supposition is true:

def parallelProcessingSession = SparkSession.builder()
  .appName("Spark memory impact").master("local[3]")
  .getOrCreate()

// The processing code is deliberately left empty to see what happens with 
// Apache Spark not impacted by the user code
def processTextRdd(textRdd: RDD[String]) = {
  textRdd.map(txt => txt)
    .foreach(txt => {})
}

def main(args: Array[String]): Unit = {
  prepareTestEnvironment()
  val textRdd = parallelProcessingSession.sparkContext.textFile(Test1GbFile)
  processTextRdd(textRdd)
}

Unsurprisingly, the code fails with this exception:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage 0.0 (TID 2, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
  // ...
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOfRange(Arrays.java:3664)
	at

But is it a rule ? Does the processing of 1GB file for 900MB available memory will fail every time ? After some tests with different Apache Spark's configuration I found a small "exception" (see later why in quotation marks):

def sequentialProcessingSession = SparkSession.builder()
  .appName("Spark memory impact").master("local[1]")
  .getOrCreate()

def main(args: Array[String]): Unit = {
  prepareTestEnvironment()
  val sparkSession = sequentialProcessingSession
  sparkSession.sparkContext
    .hadoopConfiguration.set("mapreduce.input.fileinputformat.split.minsize", (OneGbInBytes/100).toString) // 10MB
  sparkSession.sparkContext
    .hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", (OneGbInBytes/100).toString) // 10MB
  val textRdd = sparkSession.sparkContext.textFile(Test1GbFile)
  processTextRdd(textRdd)
}

Simple map & foreach operation made on that configuration works. Two things changed. The first one is sequential Apache Spark context. In my case it means 1 thread available for processing so obviously after processing a chunk of data, the occupied memory is cleaned. Another important difference is the use of mapreduce.input.fileinputformat.split.{minsize, maxsize} properties. They define how many data from processed file Apache Spark will read. In the tested case it's only 10MB. Obviously it leads to bigger number of tasks and together with sequential SparkSession contributes to slow down the whole processing. It's why I put exception in quotation marks - technically it's still possible to process files bigger than the available memory but doing that Apache Spark loses all its interest since the file is processed sequentially. Moreover in the case of more complicated operations even sequential processing won't work:

def processTextRddWithGroupBy(textRdd: RDD[String]) = {
  textRdd.map(txt => txt)
    .groupBy(key => key)
    .foreach(txt => {})
}

def main(args: Array[String]): Unit = {
  prepareTestEnvironment()
  val sparkSession = sequentialProcessingSession
  sparkSession.sparkContext
    .hadoopConfiguration.set("mapreduce.input.fileinputformat.split.minsize", (OneGbInBytes/100).toString) // 10MB
  sparkSession.sparkContext
    .hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", (OneGbInBytes/100).toString) // 10MB
  val textRdd = sparkSession.sparkContext.textFile(Test1GbFile)
  processTextRddWithGroupBy(textRdd)
}

The code leads to OOM error happening in groupBy transformation:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 50, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
  // ...
  at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:153)
  at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41)
  at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:90)
  at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:109)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

Finally, just to have a clean conscience, let's see if parallel processing works with simple map & foreach operation with split sizes defined on both local and standalone mode:

def main(args: Array[String]): Unit = {
  prepareTestEnvironment()
  val sparkSession = parallelProcessingSession
  sparkSession.sparkContext
    .hadoopConfiguration.set("mapreduce.input.fileinputformat.split.minsize", (OneGbInBytes/100).toString) // 10MB
  sparkSession.sparkContext
    .hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", (OneGbInBytes/100).toString) // 10MB
  val textRdd = sparkSession.sparkContext.textFile(Test1GbFile)
  processTextRdd(textRdd)
}

It doesn't work in local mode:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage 0.0 (TID 2, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapCharBuffer.(HeapCharBuffer.java:57)
	at java.nio.CharBuffer.allocate(CharBuffer.java:335)

Does it change something in standalone mode ? Let's first start it:

./start-master.sh --host localhost
export SPARK_WORKER_INSTANCES=2
export SPARK_WORKER_MEMORY=1G
export SPARK_WORKER_CORES=2
./start-slave.sh  spark://localhost:7077

The processing code is:

def standaloneSession = SparkSession.builder()
  .appName("Spark memory impact").master("spark://localhost:7077")
  .config("spark.executor.memory", "900m")
  .config("spark.executor.extraClassPath", sys.props("java.class.path"))
  .getOrCreate()

def main(args: Array[String]): Unit = {
  prepareTestEnvironment()
  val sparkSession = standaloneSession
  sparkSession.sparkContext
    .hadoopConfiguration.set("mapreduce.input.fileinputformat.split.minsize", (OneGbInBytes/100).toString) // 10MB
  sparkSession.sparkContext
    .hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", (OneGbInBytes/100).toString) // 10MB
  val textRdd = sparkSession.sparkContext.textFile(Test3GbFile)
  processTextRdd(textRdd)
}

Here, even though the split size is limited to 10mb, the processing still fails:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 29 in stage 0.0 failed 4 times, most recent failure: Lost task 29.3 in stage 0.0 (TID 43, 192.168.0.11, executor 5): java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapCharBuffer.(HeapCharBuffer.java:57)
	at java.nio.CharBuffer.allocate(CharBuffer.java:335)
	at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:795)
	at org.apache.hadoop.io.Text.decode(Text.java:412)
	at org.apache.hadoop.io.Text.decode(Text.java:389)
	at org.apache.hadoop.io.Text.toString(Text.java:280)
	at org.apache.spark.SparkContext$$anonfun$textFile$1$$anonfun$apply$8.apply(SparkContext.scala:824)

Apache Spark, memory and cache

After analyzing what happened with the data, let's do a similar analysis for RDD caching. Just to recall, the caching is useful when given dataset is used more than once in the same processing logic. Thanks to that the data won't be recomputed every time. And to keep the execution performant, nothing better to put the cache into memory, like below:

"memory-only cache" should "not fail when there is too few memory in sequential processing" in {
  val logAppender = InMemoryLogAppender.createLogAppender(Seq("Will not store", "Not enough space to cache"))

  val textRdd = parallelProcessingSession.sparkContext.textFile(Test1GbFile)

  textRdd.persist(StorageLevel.MEMORY_ONLY)
  processTextRdd(textRdd)

  // In the logs we can see what happens if we ask Spark to cache the data when there is not enough memory
  // The message we can find are:
  // Will not store rdd_1_8
  // Not enough space to cache rdd_1_8 in memory! (computed 190.7 MB so far)
  // Will not store rdd_1_11
  // Not enough space to cache rdd_1_11 in memory! (computed 190.7 MB so far)
  logAppender.getMessagesText().find(message => message.startsWith("Will not store rdd_1_")) shouldBe defined
  logAppender.getMessagesText().find(message => message.startsWith("Not enough space to cache rdd_") &&
    message.contains(" in memory! (computed ")) shouldBe defined
}

As you can see through the log messages, when there are not enough memory to put the RDD into the cache, the framework simply ignores the operation ("Will not store rdd_1"). The next test shows that RDD used 2 times is recomputed for every use:

"failed memory cache" should "recompute RDD if used again" in {
  // Same code as the first test but with double call to processTextRdd to see what happens in the logs
  // when we're unable to cache and despite that we use the same RDD twice
  // Locally it was rdd_1_26 block that weren't cached. It may be different but for test's simplicity I keep
  // hardcoded id
  val logAppender = InMemoryLogAppender.createLogAppender(Seq("rdd_1_26", "872415232+33554432"))
  val sequentialSession = sequentialProcessingSession

  val textRdd = sequentialSession.sparkContext.textFile(Test1GbFile)

  textRdd.persist(StorageLevel.MEMORY_ONLY)

  processTextRdd(textRdd)
  processTextRdd(textRdd)

  // As you can observe through the log events, some RDD blocks aren't cached because of insufficient free space
  // in memory. In such case Spark simply ignores it, as we saw earlier in the tests, and instead recomputes the
  // data
  // You can notice the recomputation by the presence of the log starting with "Input split". For cached blocks
  // as for instance the first one, we'll find only 1 entry for
  // "Input split: file:/tmp/spark-memory-impact/generated_file_1_gb.txt:872415232+33554432"
  val groupedLogMessages = logAppender.getMessagesText().groupBy(message => message)
  groupedLogMessages("Task 59 trying to put rdd_1_26") should have size 1
  groupedLogMessages("Task 26 trying to put rdd_1_26") should have size 1
  groupedLogMessages("Block rdd_1_26 could not be removed as it was not found on disk or in memory") should have size 2
  groupedLogMessages("Input split: file:/tmp/spark-memory-impact/generated_file_1_gb.txt:872415232+33554432") should have size 2
}

Since the recomputation is not ideal we may want to mitigate to this by using memory + disk level:

"memory-disk cache" should "store some data in memory and the rest on disk" in {
  val logAppender = InMemoryLogAppender.createLogAppender(Seq("stored as values in memory", "Not enough space to cache",
    "to disk instead", "on disk in", "on disk on"))

  val textRdd = sequentialProcessingSession.sparkContext.textFile(Test1GbFile)

  textRdd.persist(StorageLevel.MEMORY_AND_DISK)
  processTextRdd(textRdd)
  val rddCacheStoredInMemory = logAppender.getMessagesText().find(message => message.startsWith("Block rdd_1_0 stored as values in memory"))
  rddCacheStoredInMemory shouldBe defined
  val warnNotEnoughMemoryToCache = logAppender.getMessagesText().find(message => message.startsWith("Not enough space to cache rdd_"))
  warnNotEnoughMemoryToCache shouldBe defined
  val attemptToDiskCache = logAppender.getMessagesText().find(message => message.contains("Persisting block")
    && message.endsWith("to disk instead."))
  attemptToDiskCache shouldBe defined
  val successfulDiskCache = logAppender.getMessagesText().find(message => message.startsWith("Block rdd_1") && message.contains("file on disk in"))
  successfulDiskCache shouldBe defined
}

Here when the memory is insufficient, Apache Spark tries to persist cached block on disk ("Persisting block ... to disk instead" message). It can lead to the situation when a part of cached data will be stored in memory and another part on disk.

This post presented Apache Spark behavior with data bigger than the memory size. As we could see, having too big data is problematic since the processing will fail the most of time. Sometimes however it's possible to process such file. But very often it's possible when we sacrify performance. Regarding to the cache it's less problematic than input data. As proven in the second section, even if the cached RDD is too big to fit in the memory, is either split on disk or is the caching is simply ignored. In next weeks I'd like to publish a post about internal classes involved in memory-consuming operations If you want to be informed about this post, please follow me on Twitter.

Share, like or comment this post on Twitter:

Share on: