Apache Spark and data compression

on waitingforcode.com

Apache Spark and data compression

Compressed data takes less place and thus may be sent faster across the network. However these advantages transform in drawbacks in the case of parallel distributed data processing where the engine doesn't know how to split it for better parallelization. Fortunately, some of compression formats can be splitted.

The first section of this post we'll focus on the compression and its splittable formats. In the next part we'll discover how Apache Spark deals with compressed files. Finally in the last section we'll see some examples of Apache Spark working on compressed files.

Compression for parallel processing

Compression is nothing more nothing less than storing information in less verbose formats and it's especially useful for the information having lot of repeated patterns. For instance a compressed version of "ababab" text could be simply written as "3ab" and as you can see it takes 2 times less space than the full text. If you're interested on more precise definition, I invite you to read the "Compression definition" section of Compression in Parquet post.

Naively we could think that compressed files can't be read in parallel. After all, dividing slicing them by arbitrary places would lead to corrupted chunks. For instance if we produce 2 splits for our "compressed" 3ab content, we may receive: "3a" and "b" that is incorrect. Hopefully it's not a global truth and some of compression formats are splittable:

  • Bzip2 - This first splittable format provides good compression ratio. However it doesn't come without cost because the comrpession process is very CPU consuming.
    Why Bzip2 is splittable ? We can find a part of the answer in org.apache.hadoop.io.compress.BZip2Codec class used, among others, by Apache Spark to handle compressed files. Bzip-compressed data can be read in 2 modes: continuous and by block. Only the latter one is adapted for parallelized reads at arbitrary places in the file. The ability to parallelize reads comes from the internal storage of compressed data that is done by blocks of 100-900 kB. In Hadoop framework we can see the use of the blocks in initBlock() method of CBZip2InputStream class.
  • LZO - This compression format provides a good trade-off between speed and compression size. Unfortunately its native implementation is not splittable. And that despite the fact of storing compressed data in blocks of approximately 256kiB. But thanks to this block-oriented format, Twitter engineers proposed a splittable LZO format on Hadoop. In this project every LZO file has corresponding .lzo.index file with the mapping between compressed blocks and their byte offsets.
  • Snappy - I put it here because of an interesting point I found during my research. native Snappy, as LZO, isn't splittable. However, when it's used with a container file format like SequenceFile , Snappy becomes "splittable". But the difference with BZip2 and LZO is that the rule applies to container's content and not to the container itself.
    Thus, if we compress a text file with Snappy we'll be unable to split it. In the other side, dealing with compressed blocks of plain Sequence file, as SequenceFile(block1=compressedData1, block2=compressedData2,...), enables splitting. But it comes from the fact that it's SequenceFile is splittable and not Snappy. Thinking that way we could also tell that GZip or any other not splittable format can be splitted within SequenceFile or any other container format.

Hadoop SequenceFile

SequenceFile format is one example of container-like file formats those compressed content is splittable. This format stores the data as key/value pairs and is splittable because internally it doesn't override isSplittable method of FileInputFormat abstract class:

  protected boolean isSplitable(FileSystem fs, Path filename) {
    return true;
  }
  // Unlike for example TextInputFormat those splittable 
  // character depends on compression codec
  // org.apache.hadoop.mapred.TextInputFormat
  protected boolean isSplitable(FileSystem fs, Path file) {
    final CompressionCodec codec = compressionCodecs.getCodec(file);
    if (null == codec) {
      return true;
    }
    return codec instanceof SplittableCompressionCodec;
  }
  

Hence, a SequenceFile may be splitted when its record or blocks are compressed - even though compression method is not splittable by itself.

Compression and Apache Spark SQL

To deal with compressed files Apache Spark uses the codecs provided by Hadoop. We can see that in org.apache.spark.sql.catalyst.util.CompressionCodecs class where a list of supported codecs is defined at the beginning:

private val shortCompressionCodecNames = Map(
  "none" -> null,
  "uncompressed" -> null,
  "bzip2" -> classOf[BZip2Codec].getName,
  "deflate" -> classOf[DeflateCodec].getName,
  "gzip" -> classOf[GzipCodec].getName,
  "lz4" -> classOf[Lz4Codec].getName,
  "snappy" -> classOf[SnappyCodec].getName)

Thus, the way which the framework reads and process data from compressed files comes from the registered codecs. Obviously, not splittable files are processed by a single executor unless the data, after decompression, is repartitioned. In the other side splittable compressed files are processed as any parallelizable file. To discover if given format may be parallelized Apache Spark uses another abstraction provided by Hadoop - SplittableCompressionCodec interface. Apache Spark's org.apache.spark.sql.execution.datasources.FileFormat trait exposes a method called def isSplitable(sparkSession: SparkSession, options: Map[String, String], path: Path): Boolean. Almost every implementation of file data source implements it, as for instance TextBasedFileFormat abstract class used by JSON, CSV or text file data sources:

private var codecFactory: CompressionCodecFactory = _

override def isSplitable(
  sparkSession: SparkSession,
  options: Map[String, String],
  path: Path): Boolean = {
  if (codecFactory == null) {
  codecFactory = new CompressionCodecFactory(
    sparkSession.sessionState.newHadoopConfWithOptions(options))
  }
  val codec = codecFactory.getCodec(path)
  codec == null || codec.isInstanceOf[SplittableCompressionCodec]
}

As you can see, it checks if given compression format is supported and if yes, whether it supports split by implementing SplittableCompressionCodec interface. Two elements of this interface gives a good insight about the way compressed files may be read:

public enum READ_MODE {CONTINUOUS, BYBLOCK};

SplitCompressionInputStream createInputStream(InputStream seekableIn,
    Decompressor decompressor, long start, long end, READ_MODE readMode)
    throws IOException;

The enum represents 2 available read modes: splittable and not, exactly as it was explained in the previous section for Bzip2 compression format. The method creating input stream works only for seekable data (= seekableIn must implement Seekable interface). Thus we can freely move inside such compressed file. The method uses start and end parameters to find the position marking the beginning and the end of the read.

The object using this mechanism is Hadoop's org.apache.hadoop.mapred.LineRecordReader class, invoked for instance inside org.apache.spark.sql.execution.datasources.text.TextFileFormat#readToUnsafeMem(conf: Broadcast[SerializableConfiguration], requiredSchema: StructType, wholeTextMode: Boolean): (PartitionedFile) => Iterator[UnsafeRow]) method.

Two following examples show how splittable format impacts partitioning:

private val ProcessingSparkSession = SparkSession.builder()
  .appName("Spark SQL compression test").master("local[*]")
  // Defining this option is mandatory. The file is much smaller than the default value and it always ends up with
  // a single partition - even though the bzip2 is splittable
  .config("spark.sql.files.maxPartitionBytes", "5")
  .getOrCreate()

"not splittable format" should "has only 1 partition" in {
  val gzipPartitions = ProcessingSparkSession.read.option("compression", "gzip")
    .format("text").load(GzipPath.getAbsolutePath+"/gzip_compression1.gz").rdd.getNumPartitions

  gzipPartitions shouldEqual 1
}

"splittable format" should "has more than 1 partition" in {
  val bzipPartitions = ProcessingSparkSession.read.option("compression", "bzip2")
    .format("text")
    .load(BzipPath.getAbsolutePath+"/bzip_compression1.bz2").rdd.getNumPartitions

  bzipPartitions shouldEqual 9
}

Did you notice something interesting ? Yes, we must specify spark.sql.files.maxPartitionBytes entry in order to see our small file partitioned. As you will see later, it's not required for the case of RDD read. But here this configuration entry serves to define evenly balanced splits of the file. Here, since our file has 42 bytes, Apache Spark will split it in 9 partitions.

Compression and RDD

The logic for low-level RDD API is similar to the one described above. It also uses Hadoop's input formats to detect if given file may be partitioned or not at reading. For instance for SparkContext's textFile(String source) method an instance of HadoopRDD is created. One of its fields contains a list of underlying partitions. Each of them is an instance of HadoopPartition with assigned split from the file. And as you can deduce, the check on splittable format is done during HadoopPartition construction:

val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
val inputSplits = if (ignoreEmptySplits) {
  allInputSplits.filter(_.getLength > 0)
} else {
  allInputSplits
}
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
  array(i) = new HadoopPartition(id, i, inputSplits(i))
}

In the case of text-based source, the splits are retrieved from FileInputFormat class that internally checks if the format is splittable:

if (isSplitable(fs, path)) {
  long blockSize = file.getBlockSize();
  long splitSize = computeSplitSize(goalSize, minSize, blockSize);
  // ...
  while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
        String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
        length-bytesRemaining, splitSize, clusterMap);
    splits.add(makeSplit(path, length-bytesRemaining, splitSize,
        splitHosts[0], splitHosts[1]));
    bytesRemaining -= splitSize;
  }
  // ...
} else {
  String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
  splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
}

Below examples show whether Apache Spark is able to parallelize GZIP and BZip2 files:

private val BzipPath = new File("/tmp/bzip_test")
private val GzipPath = new File("/tmp/gzip_test")

"not splittable format" should "has only 1 partition" in {
  val gzipPartitions = sparkContext.textFile(GzipPath.getAbsolutePath).getNumPartitions

  gzipPartitions shouldEqual 1
}

"splittable format" should "has more than 1 partition" in {
  val bzipPartitions = sparkContext.textFile(BzipPath.getAbsolutePath).getNumPartitions

  bzipPartitions > 1 shouldBe true
}

Compression is a powerful tool in Big Data where the data needs to be moved between nodes very often. It reduces files size and helps to make such transfer faster. In the flip side it also comes with some drawbacks. The difficulty to parallelize the work is one of them. Hopefully, as proven in this post, we can find several splittable conversion formats. Certain of them are natively splittable as Bzip2. The other ones became spllittable thanks to the work of Open Source community. And LZO is one of examples of this family. The last family of splittable files are container file format as Hadoop's SequenceFile. Their content, compressed in any compression method, even not splittable natively, can be processed in parallel by Apache Spark without any problem.

Share, like or comment this post on Twitter: