DataFrame and file bigger than available memory

on waitingforcode.com

DataFrame and file bigger than available memory

Some weeks ago I've written a post about big files impact on RDD-based processing. The post revealed the difficulty to process files bigger than available memory. But as you'll see below, it's not completely true.

The post shows some play-fail tests of Apache Spark SQL processing of file bigger than the available memory. The first section shows what happens if we use the same sequential code as in the post about Apache Spark and data bigger than the memory. The next one shows some internal details about text files processing in DataFrames. The last part explains why the result got in the previous post was not completely true.

Reading files rules for Datasets

I wanted to start the analysis the simplest possible. It's why I transformed the code of SequentialProcessing class to its testable Dataset-based version. The goal was to check whether the same parameters help to force sequential reads and, hence, guarantee the processing of the file bigger than available memory. After refactoring the code looked like:

  def main(args: Array[String]): Unit = {
    prepareTestEnvironment()
    val sparkSession = sequentialProcessingSession
    import sparkSession.implicits._
    sparkSession.sparkContext
      .hadoopConfiguration.set("mapreduce.input.fileinputformat.split.minsize", (OneGbInBytes/100).toString) // 10MB
    sparkSession.sparkContext
      .hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", (OneGbInBytes/100).toString) // 10MB 

    val textDataset = sparkSession.read.textFile(Test1GbFile)

    textDataset.map(txt => txt)
      .foreach(txt => {})
  }

Once executed with the -Xms900m -Xmx900m memory limitations, unlike its RDD-based version it returned an OOM exception:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
    at java.lang.StringCoding.decode(StringCoding.java:215)
    at java.lang.String.(String.java:463)
    at java.lang.String.(String.java:515)
    at org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:1181)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(generated.java:34)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

From that I deduced that either it's impossible to read files bigger than the memory sequentially in Apache Spark SQL or simply that I was using a wrong configuration.

Apache Spark SQL and file-based data sources

From that error I decided to focus on internal execution of text file processing. Mainly it involves 3 classes. The first one is org.apache.spark.sql.execution.datasources.text.TextFileFormat and it represents the read file. Inside we can find a method called readToUnsafeMem(conf: Broadcast[SerializableConfiguration], requiredSchema: StructType, wholeTextMode: Boolean) returning a function taking in parameter an instance of PartitionedFile and generating an iterator of UnsafeRow.

And the PartitionedFile instances appeared to be the answer to the question about the class used to parallelize reads:

case class PartitionedFile(partitionValues: InternalRow, filePath: String, start: Long, length: Long, @transient locations: Array[String] = Array.empty)

As you can see, it imitates the concept of splits described in the post about memory impact on RDDs. The size of each split is computed in FileSourceScanExec's createNonBucketedReadRDD(readFile: (PartitionedFile) => Iterator[InternalRow], selectedPartitions: Seq[PartitionDirectory], fsRelation: HadoopFsRelation) method, just there:

val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism

val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

After digging a little into the SQLConf class we can figure out that the property determining the size of chunks in Spark SQL is not the same as for RDD-based API. The configuration entry to use is called spark.sql.files.maxPartitionBytes and according to the documentation, it specifies "the maximum number of bytes to pack into a single partition when reading files". Unfortunately, the configuration change didn't help and the processing was still failing. And the modified property was correctly applied because the following entry was generated in the logs:

[2018-10-23 19:35:43,102] org.apache.spark.internal.Logging$class INFO Planning scan with bin packing, max size: 10737418 bytes, open cost is considered as scanning 4194
304 bytes. (org.apache.spark.sql.execution.FileSourceScanExec:54)

After I also tried with much smaller chunks (1073741 bytes) and the foreach loop. But for both cases the code produced an OOM error:

Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
    at java.lang.StringCoding.decode(StringCoding.java:215)
    at java.lang.String.(String.java:463)
    at java.lang.String.(String.java:515)
    at org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:1181)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(generated.java:34)

Since foreach operation is costly, I also tried to execute the code against .count() instead of the loop. But here too, the execution terminated in failure:

Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
    at java.lang.StringCoding.decode(StringCoding.java:215)
    at java.lang.String.(String.java:463)
    at java.lang.String.(String.java:515)
    at org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:1181)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.deserializetoobject_doConsume$(generated.java:152)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey$(generated.java:56)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:128)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)

Test on shorter lines

After a quick analysis of above stack trace we can figure out that the problem happens at line deserialization. It's maybe because the processed file has only 10 lines, each one of 100 mb. And it's not very “real-life" scenario - even though it helps to get an idea about the limits of the processing of files bigger than the available memory. I decided then to build a file with 1mb per line and test it against previous code. Surprisingly, it worked without problems, as well for small (1 mb) and big splits (10 mb) - as well for foreach loop as for count case.

The successful execution came from the fact how Apache Spark processes text files. Initially, we could think it reads all content and stores it in the memory. However, it's not the case and we can see that pretty clearly by adding breakpoints in 3 important processing places: already covered readToUnsafeMem method in reader.map (source file reading), .foreach(txt => {}) action in the processing code and finally UnsafeRowWiter's write(int ordinal, UTF8String input) method because it's invoked directly by mapping transformation. In fact, the engine reads the file line by line and applies map with foreach directly on the line level. It explains why the file is processed even though it's bigger than the available memory. And it's true also for the failing tests from the previous post based on RDD processing.

So if it worked correctly for narrow transformation, it merits to check whether it will work for wide transformation too. And the processing logic didn't fail for it:

    val mapped = textDataset.map(txt => s"abc${txt}")
        .groupBy($"value")
          .count() 

Even better - above code with groupBy also works for the file with 100mb per line. It's because the aggregation operates on hashes that are much smaller in size than the original data. Moreover, the generated code has a protection against memory problems and only eventually it fails into OOM:

/* 093 */     if (true) {
/* 094 */       // try to get the buffer from hash map
/* 095 */       agg_unsafeRowAggBuffer =
/* 096 */       agg_hashMap.getAggregationBufferFromUnsafeRow(agg_mutableStateArray[0], agg_value2);
/* 097 */     }
/* 098 */     // Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
/* 099 */     // aggregation after processing all input rows.
/* 100 */     if (agg_unsafeRowAggBuffer == null) {
/* 101 */       if (agg_sorter == null) {
/* 102 */         agg_sorter = agg_hashMap.destructAndCreateExternalSorter();
/* 103 */       } else {
/* 104 */         agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 105 */       }
/* 106 */
/* 107 */       // the hash map had be spilled, it should have enough memory now,
/* 108 */       // try to allocate buffer again.
/* 109 */       agg_unsafeRowAggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(
/* 110 */         agg_mutableStateArray[0], agg_value2);
/* 111 */       if (agg_unsafeRowAggBuffer == null) {
/* 112 */         // failed to allocate the first page
/* 113 */         throw new OutOfMemoryError("No enough memory for aggregation");
/* 114 */       }
/* 115 */     }

The above tests execute similarly on my standalone Spark cluster where executors have 1gb of memory. Unfortunately, the last tested code transformed to RDD version ends up with an OOM.

This post through several play-fail examples tried to explain what happens when the files processed by Apache Spark SQL are bigger than the available memory. As we could see, it really depends. When we try to process the very big lines (100mb) in a foreach loop, the processing will fail with an OOM. But on the other side, it will not for a hash-based aggregation for the same data. And we could also see that for smaller lines (1mb), both foreach and count operations didn't fail. Hence we can deduce that the line length, has an important role in DataFrame ability to work with files bigger than the available memory. It's much more impacting than the whole size of a file because of the internal logic of processing text files data sources highlighted in the second section. It's then quite possible to process files bigger than the available memory, even in parallel processing. But of course, the risk of failure is big - especially if we need to memorize some extra content. The tests presented above were simple cases without any processing logic that may fake the final result a little.

Share, like or comment this post on Twitter:

Share on: