Apache Spark and data compression

Versions: Apache Spark 2.3.1

Compressed data takes less place and thus may be sent faster across the network. However these advantages transform in drawbacks in the case of parallel distributed data processing where the engine doesn't know how to split it for better parallelization. Fortunately, some of compression formats can be splitted.

The first section of this post we'll focus on the compression and its splittable formats. In the next part we'll discover how Apache Spark deals with compressed files. Finally in the last section we'll see some examples of Apache Spark working on compressed files.

Compression for parallel processing

Compression is nothing more nothing less than storing information in less verbose formats and it's especially useful for the information having lot of repeated patterns. For instance a compressed version of "ababab" text could be simply written as "3ab" and as you can see it takes 2 times less space than the full text. If you're interested on more precise definition, I invite you to read the "Compression definition" section of Compression in Parquet post.

Naively we could think that compressed files can't be read in parallel. After all, dividing slicing them by arbitrary places would lead to corrupted chunks. For instance if we produce 2 splits for our "compressed" 3ab content, we may receive: "3a" and "b" that is incorrect. Hopefully it's not a global truth and some of compression formats are splittable:

Hadoop SequenceFile

SequenceFile format is one example of container-like file formats those compressed content is splittable. This format stores the data as key/value pairs and is splittable because internally it doesn't override isSplittable method of FileInputFormat abstract class:

  protected boolean isSplitable(FileSystem fs, Path filename) {
    return true;
  }
  // Unlike for example TextInputFormat those splittable 
  // character depends on compression codec
  // org.apache.hadoop.mapred.TextInputFormat
  protected boolean isSplitable(FileSystem fs, Path file) {
    final CompressionCodec codec = compressionCodecs.getCodec(file);
    if (null == codec) {
      return true;
    }
    return codec instanceof SplittableCompressionCodec;
  }
  

Hence, a SequenceFile may be splitted when its record or blocks are compressed - even though compression method is not splittable by itself.

Compression and Apache Spark SQL

To deal with compressed files Apache Spark uses the codecs provided by Hadoop. We can see that in org.apache.spark.sql.catalyst.util.CompressionCodecs class where a list of supported codecs is defined at the beginning:

private val shortCompressionCodecNames = Map(
  "none" -> null,
  "uncompressed" -> null,
  "bzip2" -> classOf[BZip2Codec].getName,
  "deflate" -> classOf[DeflateCodec].getName,
  "gzip" -> classOf[GzipCodec].getName,
  "lz4" -> classOf[Lz4Codec].getName,
  "snappy" -> classOf[SnappyCodec].getName)

Thus, the way which the framework reads and process data from compressed files comes from the registered codecs. Obviously, not splittable files are processed by a single executor unless the data, after decompression, is repartitioned. In the other side splittable compressed files are processed as any parallelizable file. To discover if given format may be parallelized Apache Spark uses another abstraction provided by Hadoop - SplittableCompressionCodec interface. Apache Spark's org.apache.spark.sql.execution.datasources.FileFormat trait exposes a method called def isSplitable(sparkSession: SparkSession, options: Map[String, String], path: Path): Boolean. Almost every implementation of file data source implements it, as for instance TextBasedFileFormat abstract class used by JSON, CSV or text file data sources:

private var codecFactory: CompressionCodecFactory = _

override def isSplitable(
  sparkSession: SparkSession,
  options: Map[String, String],
  path: Path): Boolean = {
  if (codecFactory == null) {
  codecFactory = new CompressionCodecFactory(
    sparkSession.sessionState.newHadoopConfWithOptions(options))
  }
  val codec = codecFactory.getCodec(path)
  codec == null || codec.isInstanceOf[SplittableCompressionCodec]
}

As you can see, it checks if given compression format is supported and if yes, whether it supports split by implementing SplittableCompressionCodec interface. Two elements of this interface gives a good insight about the way compressed files may be read:

public enum READ_MODE {CONTINUOUS, BYBLOCK};

SplitCompressionInputStream createInputStream(InputStream seekableIn,
    Decompressor decompressor, long start, long end, READ_MODE readMode)
    throws IOException;

The enum represents 2 available read modes: splittable and not, exactly as it was explained in the previous section for Bzip2 compression format. The method creating input stream works only for seekable data (= seekableIn must implement Seekable interface). Thus we can freely move inside such compressed file. The method uses start and end parameters to find the position marking the beginning and the end of the read.

The object using this mechanism is Hadoop's org.apache.hadoop.mapred.LineRecordReader class, invoked for instance inside org.apache.spark.sql.execution.datasources.text.TextFileFormat#readToUnsafeMem(conf: Broadcast[SerializableConfiguration], requiredSchema: StructType, wholeTextMode: Boolean): (PartitionedFile) => Iterator[UnsafeRow]) method.

Two following examples show how splittable format impacts partitioning:

private val ProcessingSparkSession = SparkSession.builder()
  .appName("Spark SQL compression test").master("local[*]")
  // Defining this option is mandatory. The file is much smaller than the default value and it always ends up with
  // a single partition - even though the bzip2 is splittable
  .config("spark.sql.files.maxPartitionBytes", "5")
  .getOrCreate()

"not splittable format" should "has only 1 partition" in {
  val gzipPartitions = ProcessingSparkSession.read.option("compression", "gzip")
    .format("text").load(GzipPath.getAbsolutePath+"/gzip_compression1.gz").rdd.getNumPartitions

  gzipPartitions shouldEqual 1
}

"splittable format" should "has more than 1 partition" in {
  val bzipPartitions = ProcessingSparkSession.read.option("compression", "bzip2")
    .format("text")
    .load(BzipPath.getAbsolutePath+"/bzip_compression1.bz2").rdd.getNumPartitions

  bzipPartitions shouldEqual 9
}

Did you notice something interesting ? Yes, we must specify spark.sql.files.maxPartitionBytes entry in order to see our small file partitioned. As you will see later, it's not required for the case of RDD read. But here this configuration entry serves to define evenly balanced splits of the file. Here, since our file has 42 bytes, Apache Spark will split it in 9 partitions.

Compression and RDD

The logic for low-level RDD API is similar to the one described above. It also uses Hadoop's input formats to detect if given file may be partitioned or not at reading. For instance for SparkContext's textFile(String source) method an instance of HadoopRDD is created. One of its fields contains a list of underlying partitions. Each of them is an instance of HadoopPartition with assigned split from the file. And as you can deduce, the check on splittable format is done during HadoopPartition construction:

val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
val inputSplits = if (ignoreEmptySplits) {
  allInputSplits.filter(_.getLength > 0)
} else {
  allInputSplits
}
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
  array(i) = new HadoopPartition(id, i, inputSplits(i))
}

In the case of text-based source, the splits are retrieved from FileInputFormat class that internally checks if the format is splittable:

if (isSplitable(fs, path)) {
  long blockSize = file.getBlockSize();
  long splitSize = computeSplitSize(goalSize, minSize, blockSize);
  // ...
  while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
        String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
        length-bytesRemaining, splitSize, clusterMap);
    splits.add(makeSplit(path, length-bytesRemaining, splitSize,
        splitHosts[0], splitHosts[1]));
    bytesRemaining -= splitSize;
  }
  // ...
} else {
  String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
  splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
}

Below examples show whether Apache Spark is able to parallelize GZIP and BZip2 files:

private val BzipPath = new File("/tmp/bzip_test")
private val GzipPath = new File("/tmp/gzip_test")

"not splittable format" should "has only 1 partition" in {
  val gzipPartitions = sparkContext.textFile(GzipPath.getAbsolutePath).getNumPartitions

  gzipPartitions shouldEqual 1
}

"splittable format" should "has more than 1 partition" in {
  val bzipPartitions = sparkContext.textFile(BzipPath.getAbsolutePath).getNumPartitions

  bzipPartitions > 1 shouldBe true
}

Compression is a powerful tool in Big Data where the data needs to be moved between nodes very often. It reduces files size and helps to make such transfer faster. In the flip side it also comes with some drawbacks. The difficulty to parallelize the work is one of them. Hopefully, as proven in this post, we can find several splittable conversion formats. Certain of them are natively splittable as Bzip2. The other ones became spllittable thanks to the work of Open Source community. And LZO is one of examples of this family. The last family of splittable files are container file format as Hadoop's SequenceFile. Their content, compressed in any compression method, even not splittable natively, can be processed in parallel by Apache Spark without any problem.