Big Data patterns implemented - data size reduction

Versions: Apache Spark 2.4.0

After several weeks of inactivity, the series about data engineering patterns is back. In this resume's article, I will present a pattern called dataset reduction.

In the first section of the post I will list several points that may be impacted by the dataset size. In the next one, I will give some examples of how to overcome them.

Data size impact

Since data processing is nowadays mostly distributed, the size of the processed dataset has an important impact. The first concerned layer is storage. Even though the storage is cheaper than it was in the past, storing more data still costs more. Also, the storage part has an impact on the limitations of many cloud managed NoSQL stores. Some of them limit the maximal size of the stored data and therefore, you can't store anything you want. For example, as of this writing, an item (row) in DynamoDB can have at most 400KB.

Another impacted part is network. Since most of the time you will need to move the data to your computation logic, the size of the dataset is important. More data you have to transfer, longer it will take - it's simple like that.

Methods

The simplest method to reduce dataset size is a simple projection. Hence, if you know that your consumers won't need the whole record, you can simply select the fields they will likely use. In Apache Spark SQL, it will be a simple .select("....") statement:

  "dataset projection" should "reduce the size of stored data" in {
    val dataset = Seq(
      (1, "a", "A"),
      (2, "b", "B"),
      (3, "c", "C"),
      (4, "d", "D")
    ).toDF("nr", "lower_case_letter", "upper_case_letter")

    val reducedDataset = dataset.select("nr", "lower_case_letter")

    reducedDataset.collect().map(row => s"${row.getAs[Int]("nr")}-${row.getAs[String]("lower_case_letter")}") should
      contain allOf("1-a", "2-b", "3-c", "4-d")
  }

Another method, more complicated than the previous one, is vertical partitioning. In the opposite to horizontal partitioning, vertical partitioning distributes different parts of a record rather than the entire record. For instance, if you try to partition a record Customer{first_name, last_name, address, hobby, liked_products} vertically, you will split its 5 fields into multiple datasets, depending on their purpose. In the example I've just given, you can consider the 3 first fields (first_name, last_name, address) as mandatory administrative information whereas 2 last ones as the values useful for Machine Learning algorithms. An example of it could look like this:

  "vertical partitioning" should "reduce the size of dataset in a K/V store" in {
    val dataset = Seq(
      (1, "a", "A"),
      (2, "b", "B"),
      (3, "c", "C"),
      (4, "d", "D")
    ).toDF("nr", "lower_case_letter", "upper_case_letter")

    dataset.foreachPartition(rows => {
      // Always prefer batch operations - less network communication, data store queried only once for n keys
      rows.grouped(100)
        .foreach(rowsBatch => {
          val mappedLowerCases = rowsBatch.map(row => (row.getAs[Int]("nr"), row.getAs[String]("lower_case_letter")))
          val mappedUpperCases = rowsBatch.map(row => (row.getAs[Int]("nr"), row.getAs[String]("upper_case_letter")))
          KeyValueStore.Letters.addAllLowerCase(mappedLowerCases)
          KeyValueStore.Letters.addAllUpperCase(mappedUpperCases)
        })
    })

    KeyValueStore.Letters.allLowerCase should contain allOf (1 -> "a", 2 -> "b", 3 -> "c", 4 -> "d")
    KeyValueStore.Letters.allUpperCase should contain allOf (1 -> "A", 2 -> "B", 3 -> "C", 4 -> "D")
  }

The next dataset reduction method uses serialization frameworks. Often the data is represented as a JSON file since this format gives more flexibility and doesn't require any special framework. On the other side, it has an important drawback of bringing the schema alongside the values. To overcome that, you can use one of the popular serialization formats (Apache Avro, Protobuf, …). You may not see the difference for a small number of records but it may be noticeable for thousands of them. I prepared an example to show the difference better:

  "a serialization framework" should "reduce the size of dataset" in {
    val dataset = (0 to 1000).map(nr => (nr, s"number${nr}")).toDF("nr", "label")


    val avroOutput = "/tmp/dataset-avro"
    val jsonOutput = "/tmp/dataset-json"
    dataset.write.format("avro").mode(SaveMode.Overwrite).save(avroOutput)
    dataset.write.format("json").mode(SaveMode.Overwrite).save(jsonOutput)

    val avroFilesSize = FileUtils.sizeOfDirectory(new File(avroOutput))
    val jsonFilesSize = FileUtils.sizeOfDirectory(new File(jsonOutput))
    println(s"avro=${avroFilesSize} vs json=${jsonFilesSize}")
    jsonFilesSize should be > avroFilesSize
  }

And finally, the most evident way to reduce the dataset size, compression. Simply speaking, compression is a way to represent repetitive patterns with smaller bits. On the other side, sometimes it may complicate the reading side since not a lot of compression codecs can be processed in parallel. Below you can find the size for an example of compressed and uncompressed dataset:

  "compressed files" should "reduce the size of dataset" in {
    val dataset = (0 to 1000).map(nr => (nr, s"number${nr}")).toDF("nr", "label")

    val rawOutput = "/tmp/dataset-json-raw"
    val compressedOutput = "/tmp/dataset-json-compressed"
    dataset.write.format("json").mode(SaveMode.Overwrite).save(rawOutput)
    dataset.write.format("json").option("compression", "gzip").mode(SaveMode.Overwrite).save(compressedOutput)

    val rawOutputFileSize = FileUtils.sizeOfDirectory(new File(rawOutput))
    val compressedOutputFileSize = FileUtils.sizeOfDirectory(new File(compressedOutput))
    println(s"raw=${rawOutputFileSize} vs compressed=${compressedOutputFileSize}")
    rawOutputFileSize should be > compressedOutputFileSize
  }

Dataset reduction is a helpful pattern that can be the solution for a lot of problems. You will encounter them especially during the work with modern managed services where very often the size of manipulated data is limited. The reduction will also have a positive impact on your long-term storage, and therefore, the costs. To put it in place you can use one of the methods listed in the second section of the post, where the easiest one consists on simply compressing the data.