DataFrame and file bigger than available memory

on waitingforcode.com

DataFrame and file bigger than available memory

Some weeks ago I've written a post about files with long lines impact on RDD-based processing. The post revealed the difficulty to process such files because of OOM errors. In this post I wanted to check how does it apply to Datasets.

The post shows some play-fail tests of Apache Spark SQL processing of file bigger than the available memory. The first section shows what happens if we use the same sequential code as in the post about Apache Spark and data bigger than the memory. The next one shows some internal details about text files processing in DataFrames. The last part explains why the result got in the previous post was not completely true.

Reading files rules for Datasets

I wanted to start the analysis the simplest possible. It's why I transformed the code of SequentialProcessing class to its testable Dataset-based version. The goal was to check whether the same parameters help to force sequential reads and, hence, guarantee the processing of the line bigger than available memory. After refactoring the code looked like:

  def main(args: Array[String]): Unit = {
    prepareTestEnvironment()
    val sparkSession = sequentialProcessingSession
    import sparkSession.implicits._ 

    val textDataset = sparkSession.read.textFile(Test1GbFile)

    textDataset.map(txt => txt)
      .foreach(txt => {})
  }

Once executed with the -Xms900m -Xmx900m memory limitations, unlike its RDD-based version it returned an OOM exception:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
    at java.lang.StringCoding.decode(StringCoding.java:215)
    at java.lang.String.(String.java:463)
    at java.lang.String.(String.java:515)
    at org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:1181)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(generated.java:34)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

Apache Spark SQL and file-based data sources

From that error I decided to focus on internal execution of text file processing. Mainly it involves 3 classes. The first one is org.apache.spark.sql.execution.datasources.text.TextFileFormat and it represents the read file. Inside we can find a method called readToUnsafeMem(conf: Broadcast[SerializableConfiguration], requiredSchema: StructType, wholeTextMode: Boolean) returning a function taking in parameter an instance of PartitionedFile and generating an iterator of UnsafeRow.

And the PartitionedFile instances appeared to be the answer to the question about the class used to parallelize reads:

case class PartitionedFile(partitionValues: InternalRow, filePath: String, start: Long, length: Long, @transient locations: Array[String] = Array.empty)

As you can see, it uses the concept of splits to load the lines. The size of each split is computed in FileSourceScanExec's createNonBucketedReadRDD(readFile: (PartitionedFile) => Iterator[InternalRow], selectedPartitions: Seq[PartitionDirectory], fsRelation: HadoopFsRelation) method, just there:

val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism

val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

After digging a little into the SQLConf class we can figure out that the property determining the size of chunks in Spark SQL is not the same as for RDD-based API. The configuration entry to use is called spark.sql.files.maxPartitionBytes and according to the documentation, it specifies "the maximum number of bytes to pack into a single partition when reading files". Unfortunately, the configuration change didn't help and the processing was still failing (read is line-based after all). And the modified property was correctly applied because the following entry was generated in the logs:

[2018-10-23 19:35:43,102] org.apache.spark.internal.Logging$class INFO Planning scan with bin packing, max size: 10737418 bytes, open cost is considered as scanning 4194
304 bytes. (org.apache.spark.sql.execution.FileSourceScanExec:54)

After I also tried with much smaller chunks (1073741 bytes) and the foreach loop. But for both cases the code produced an OOM error:

Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
    at java.lang.StringCoding.decode(StringCoding.java:215)
    at java.lang.String.(String.java:463)
    at java.lang.String.(String.java:515)
    at org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:1181)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(generated.java:34)

Since foreach operation is costly, I also tried to execute the code against .count() instead of the loop. But here too, the execution terminated in failure:

Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
    at java.lang.StringCoding.decode(StringCoding.java:215)
    at java.lang.String.(String.java:463)
    at java.lang.String.(String.java:515)
    at org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:1181)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.deserializetoobject_doConsume$(generated.java:152)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey$(generated.java:56)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:128)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)

Test on shorter lines

After a quick analysis of above stack trace we can figure out that the problem happens at line deserialization. It's maybe because the processed file has only 10 lines, each one of 100 mb which after putting them into memory reaches the available space per executor. I agree, it's not very "real-life" scenario but it helps to get an idea about the limits of the processing of files bigger than the memory. I decided then to build a file with 1mb per line and test it against previous code. It worked without problems, as well for small (1 mb) and big splits (10 mb) - as well for foreach loop as for count case.

The successful execution came from the fact how Apache Spark processes text files.The framework doesn't read all content at once. We can see that pretty clearly by adding breakpoints in 3 important processing places: already covered readToUnsafeMem method in reader.map (source file reading), .foreach(txt => {}) action in the processing code and finally UnsafeRowWiter's write(int ordinal, UTF8String input) method because it's invoked directly by mapping transformation. From that we can see that the engine reads the file line by line and applies map with foreach directly on the line level. It explains why the file is processed even though it's bigger than the available executor memory. And with such shorter lines, the failing cases from previous post exploring RDD-based processing (Apache Spark and data bigger than the memory) work as well.

So if it works correctly for narrow transformation, it merits to check whether it will work for wide transformation too. And the processing logic didn't fail for it:

    val mapped = textDataset.map(txt => s"abc${txt}")
        .groupBy($"value")
          .count() 

Above code with groupBy also worked. Moreover, the generated code has a protection against memory problems and only eventually it fails into OOM:

/* 093 */     if (true) {
/* 094 */       // try to get the buffer from hash map
/* 095 */       agg_unsafeRowAggBuffer =
/* 096 */       agg_hashMap.getAggregationBufferFromUnsafeRow(agg_mutableStateArray[0], agg_value2);
/* 097 */     }
/* 098 */     // Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
/* 099 */     // aggregation after processing all input rows.
/* 100 */     if (agg_unsafeRowAggBuffer == null) {
/* 101 */       if (agg_sorter == null) {
/* 102 */         agg_sorter = agg_hashMap.destructAndCreateExternalSorter();
/* 103 */       } else {
/* 104 */         agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 105 */       }
/* 106 */
/* 107 */       // the hash map had be spilled, it should have enough memory now,
/* 108 */       // try to allocate buffer again.
/* 109 */       agg_unsafeRowAggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(
/* 110 */         agg_mutableStateArray[0], agg_value2);
/* 111 */       if (agg_unsafeRowAggBuffer == null) {
/* 112 */         // failed to allocate the first page
/* 113 */         throw new OutOfMemoryError("No enough memory for aggregation");
/* 114 */       }
/* 115 */     }

This post through several play-fail examples tried to explain what happens when the files processed by Apache Spark SQL are bigger than the available memory. As we could see, it really depends. When we try to process the very big lines (100mb), the processing will fail with an OOM simply because the line reaches the limit of available memory. We could also see that for smaller lines (1mb), everything works well. From that we can deduce that the line length has an important role in the ability to work with files. It's much more impacting than the whole size of a file because of the internal logic of processing text files data sources highlighted in the second section. It's then quite possible to process files bigger than the available memory, even in parallel processing - unless the whole line reaches the memory limits.

Share, like or comment this post on Twitter:

Share on: