https://github.com/bartosz25/spark-...ala/com/waitingforcode/memoryimpact
Unlike Hadoop Map/Reduce, Apache Spark uses the power of memory to speed-up data processing. But does it mean that we can't process datasets bigger than the memory limits ? Below small survey will try to answer to that question.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
This post tends to give some points about Apache Spark behavior when files to process and data to cache are bigger than the available memory. It focuses on low-level RDD abstraction and only one of next posts will try to do the same exercises for Dataset and streaming data sources (DataFrame and file bigger than available memory). The first part of this post gives a general insight about memory use in Apache Spark. The second part focuses on one of its use cases - files data processing. The last section talks about cache feature and its impact on memory. The sections contain some examples showing Apache Spark behavior given some specific "size" conditions which are files with few very long lines (100MB each).
Apache Spark and memory
Capacity prevision is one of hardest task in data processing preparation. Very often we think that only dataset size does matter in this operation but it's not true. Every time there is an extra memory overhead because of the framework code and the data processing logic. Apache Spark is not an exception since it requires also some space to run the code and execute some other memory-impacting components as:
- cache - if given data is reused in different places often it's worth caching it to avoid time consuming recomputation. The storage for cache is defined by the storage level (org.apache.spark.storage.StorageLevel). Among the native choices we can find: memory, disk or memory + disk. Each of these levels can either be serialized or serialized, and replicated or not. In this post we'll focus on memory and memory + disk storage.
- broadcast variables - these objects are sent to executors only once. Executors keep them in the memory during the whole data processing operation.
- tasks - also data processing logic needs memory to work. We can find it in the log entries similar to:
DEBUG org.apache.spark.memory.TaskMemoryManager Task 6 acquired 5.4 MB for DEBUG org.apache.spark.util.collection.ExternalAppendOnlyMap@57fe7437 # or INFO org.apache.spark.memory.TaskMemoryManager: Memory used in task 6317340 INFO org.apache.spark.memory.TaskMemoryManager: Acquired by org.apache.spark.shuffle.sort.ShuffleExternalSorter@2c98b15b: 32.0 KB INFO org.apache.spark.memory.TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@566144b7: 64.0 KB INFO org.apache.spark.memory.TaskMemoryManager: Acquired by org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@ea479ad: 13.3 GB
TaskMemoryManager allocates the memory needed for the data used in the executed task. It does it for hash-based aggregations (see HashAggregateExec, TungstenAggregationIterator), sort-based shuffle (ShuffleExternalSorter) or other sorters (UnsafeExternalSorter). It's also widely used in the data structures related to the tasks. We can find them by looking for the implementations of MemoryConsumer abstract class. - processed data - obviously, the processed data is put into the memory accordingly to the defined partitioning. Hence if a partition is too big to be kept in memory it can lead to Out of memory problems.
It's also important to keep in mind that user-defined processing logic also impacts the memory. A pretty clear example for that is the code doing some extra caching of enrichment data.
Let's execute now a small example to see what happens in the log and hence observe which Apache Spark components use memory:
// To generate tested files, go to /src/test/resources/memory_impact val OneGbInBytes = 1073741824L def sequentialProcessingSession = SparkSession.builder() .appName("Spark memory types").master("local[1]") .getOrCreate() object GeneralMemoryIllustration { def main(args: Array[String]): Unit = { val testDirectory = "/tmp/spark-memory-illustration" val fileName = "generated_file_500_mb.txt" Files.createDirectory(new File(testDirectory).toPath) Files.copy(getClass.getResourceAsStream(s"/memory_impact/${fileName}"), new File(s"${testDirectory}/${fileName}").toPath) val textRdd = sequentialProcessingSession.sparkContext.textFile(s"${testDirectory}/${fileName}") textRdd.persist(StorageLevel.MEMORY_ONLY) processTextRdd(textRdd) } }
After the code execution we can find some memory use indicators in the logs:
INFO Added rdd_1_2 in memory on 192.168.0.11:39758 (size: 190.7 MB, free: 474.7 MB) (org.apache.spark.storage.BlockManagerInfo:54) INFO Block rdd_1_3 stored as values in memory (estimated size 16.0 B, free 474.5 MB) (org.apache.spark.storage.memory.MemoryStore:54) INFO Block broadcast_2 stored as values in memory (estimated size 6.5 KB, free 283.8 MB) (org.apache.spark.storage.memory.MemoryStore:54) DEBUG Task 15 release 0.0 B from org.apache.spark.util.collection.ExternalAppendOnlyMap@db18c1e (org. apache.spark.memory.TaskMemoryManager:231)
If you analyze the logs carefully you can notice the presence of "as values" blocks storage. In fact block manager - Apache Spark's component responsible for blocks storage - is able to persist the blocks either as values or as bytes. The former format stores the data as a contiguous array of bytes. The latter one stores the object in multiple chunks.
Some clarification
The article makes wrongly thinking that it's impossible to process files bigger than the available memory. The real reason for the failed processing that I highlighted in "DataFrame and file bigger than available memory" post was the length of each individual line. I explained it shortly in Test on shorter lines section of the mentioned follow-up post. Please keep this in mind when reading the rest of this article.
If you are interested on more detailed explanation, you can read Phil's follow-up post about Apache Spark and memory.
Apache Spark, memory and data processing
Let's try to figure out what happens with the application when we use less memory than the input's size. The memory in the below tests is limited to 900MB by -Xms900m -Xmx900m options which gives approximately 360MB for execution (120MB/task). Let's start with this snippet:
def parallelProcessingSession = SparkSession.builder() .appName("Spark memory impact").master("local[3]") .getOrCreate() // The processing code is deliberately left empty to see what happens with // Apache Spark not impacted by the user code def processTextRdd(textRdd: RDD[String]) = { textRdd.map(txt => txt) .foreach(txt => {}) } def main(args: Array[String]): Unit = { prepareTestEnvironment() val textRdd = parallelProcessingSession.sparkContext.textFile(Test1GbFile) processTextRdd(textRdd) }
After executing this code locally, it fails with this exception:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage 0.0 (TID 2, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space // ... Caused by: java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOfRange(Arrays.java:3664) at
But is it a rule ? After some "play-fail" tests with different Apache Spark's configurations I found a small "exception" (see later why in quotation marks):
def sequentialProcessingSession = SparkSession.builder() .appName("Spark memory impact").master("local[1]") .getOrCreate() def main(args: Array[String]): Unit = { prepareTestEnvironment() val sparkSession = sequentialProcessingSession val textRdd = sparkSession.sparkContext.textFile(Test1GbFile) processTextRdd(textRdd) }
Simple map & foreach operation made on that configuration works. As you can see, Apache Spark executed here sequentially. In my case it means 1 thread available for processing, so 360MB of execution memory allocated. It's enough to process my 100-characters lines. Of course, this sequential SparkSession contributes to slow down the whole processing and it's why I put exception in quotation marks. In addition to that, the configuration doesn't work for all the cases:
def processTextRddWithGroupBy(textRdd: RDD[String]) = { textRdd.map(txt => txt) .groupBy(key => key) .foreach(txt => {}) } def main(args: Array[String]): Unit = { prepareTestEnvironment() val sparkSession = sequentialProcessingSession val textRdd = sparkSession.sparkContext.textFile(Test1GbFile) processTextRddWithGroupBy(textRdd) }
The code leads to OOM error happening in groupBy transformation:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 50, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space // ... at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:153) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:90) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Record size on-heap
As stated in the "info" block before previous section, the problem comes not from the global size of the file but from the length of each line. When Apache Spark reads each line to a String, it uses approximately 200MB to represent it in memory (100 milion numbers/line, 2 bytes used for each character). When it tries to load the lines for 3 simoultanesely running tasks, it fails since the execution memory reserved for each task is only 120MB. So, it's possible to process files bigger than the available memory but only when the length of each individual record isn't bigger than the memory allocated for each task.
Once again, Phil's follow-up post explains it pretty well.
Apache Spark, memory and cache
After analyzing what happened with the data, let's do a similar analysis for RDD caching. Just to recall, the caching is useful when given dataset is used more than once in the same processing logic. Thanks to that the data won't be recomputed every time. And to keep the execution performant, nothing better to put the cache into memory, like below:
"memory-only cache" should "not fail when there is too few memory in sequential processing" in { val logAppender = InMemoryLogAppender.createLogAppender(Seq("Will not store", "Not enough space to cache")) val textRdd = parallelProcessingSession.sparkContext.textFile(Test1GbFile) textRdd.persist(StorageLevel.MEMORY_ONLY) processTextRdd(textRdd) // In the logs we can see what happens if we ask Spark to cache the data when there is not enough memory // The message we can find are: // Will not store rdd_1_8 // Not enough space to cache rdd_1_8 in memory! (computed 190.7 MB so far) // Will not store rdd_1_11 // Not enough space to cache rdd_1_11 in memory! (computed 190.7 MB so far) logAppender.getMessagesText().find(message => message.startsWith("Will not store rdd_1_")) shouldBe defined logAppender.getMessagesText().find(message => message.startsWith("Not enough space to cache rdd_") && message.contains(" in memory! (computed ")) shouldBe defined }
As you can see through the log messages, when there are not enough memory to put the RDD into the cache, the framework simply ignores the operation ("Will not store rdd_1"). The next test shows that RDD used 2 times is recomputed for every use:
"failed memory cache" should "recompute RDD if used again" in { // Same code as the first test but with double call to processTextRdd to see what happens in the logs // when we're unable to cache and despite that we use the same RDD twice // Locally it was rdd_1_26 block that weren't cached. It may be different but for test's simplicity I keep // hardcoded id val logAppender = InMemoryLogAppender.createLogAppender(Seq("rdd_1_26", "872415232+33554432")) val sequentialSession = sequentialProcessingSession val textRdd = sequentialSession.sparkContext.textFile(Test1GbFile) textRdd.persist(StorageLevel.MEMORY_ONLY) processTextRdd(textRdd) processTextRdd(textRdd) // As you can observe through the log events, some RDD blocks aren't cached because of insufficient free space // in memory. In such case Spark simply ignores it, as we saw earlier in the tests, and instead recomputes the // data // You can notice the recomputation by the presence of the log starting with "Input split". For cached blocks // as for instance the first one, we'll find only 1 entry for // "Input split: file:/tmp/spark-memory-impact/generated_file_1_gb.txt:872415232+33554432" val groupedLogMessages = logAppender.getMessagesText().groupBy(message => message) groupedLogMessages("Task 59 trying to put rdd_1_26") should have size 1 groupedLogMessages("Task 26 trying to put rdd_1_26") should have size 1 groupedLogMessages("Block rdd_1_26 could not be removed as it was not found on disk or in memory") should have size 2 groupedLogMessages("Input split: file:/tmp/spark-memory-impact/generated_file_1_gb.txt:872415232+33554432") should have size 2 }
Since the recomputation is not ideal we may want to mitigate to this by using memory + disk level:
"memory-disk cache" should "store some data in memory and the rest on disk" in { val logAppender = InMemoryLogAppender.createLogAppender(Seq("stored as values in memory", "Not enough space to cache", "to disk instead", "on disk in", "on disk on")) val textRdd = sequentialProcessingSession.sparkContext.textFile(Test1GbFile) textRdd.persist(StorageLevel.MEMORY_AND_DISK) processTextRdd(textRdd) val rddCacheStoredInMemory = logAppender.getMessagesText().find(message => message.startsWith("Block rdd_1_0 stored as values in memory")) rddCacheStoredInMemory shouldBe defined val warnNotEnoughMemoryToCache = logAppender.getMessagesText().find(message => message.startsWith("Not enough space to cache rdd_")) warnNotEnoughMemoryToCache shouldBe defined val attemptToDiskCache = logAppender.getMessagesText().find(message => message.contains("Persisting block") && message.endsWith("to disk instead.")) attemptToDiskCache shouldBe defined val successfulDiskCache = logAppender.getMessagesText().find(message => message.startsWith("Block rdd_1") && message.contains("file on disk in")) successfulDiskCache shouldBe defined }
Here when the memory is insufficient, Apache Spark tries to persist cached block on disk ("Persisting block ... to disk instead" message). It can lead to the situation when a part of cached data will be stored in memory and another part on disk.
This post presented Apache Spark behavior with data bigger than the memory size. As we could see, when a record's size is bigger than the memory reserved for a task, the processing will fail - unless you process data with only 1 parallel task and the total memory size is much bigger than the size of the biggest line. Regarding to the cache having bigger lines is less problematic. As proven in the last section, even if the cached RDD is too big to fit in the memory, it's either split on disk or simply the caching is ignored.