What's new in Apache Spark 3.0 - binary data source

Versions: Apache Spark 3.0.0

I remember my first days with Apache Spark and the analysis of available RDD data sources. Since then, I have used a lot of them, except the binary data which is a new implemented part in Apache Spark SQL in the release 3.0.

The post starts with a general presentation of this new data source. Its second part covers more in-depth aspects.

Binary data source

This feature is one of the oldest ones because the community implemented it in April 2019. Its main goal is to allow the processing of binary data, like image, audio or video, so something that you can use in your Deep Learning algorithms.

In the implementation a new file format, represented by BinaryFileFormat, was added. It exposes the binary data with the following columns:

  private[binaryfile] val PATH = "path"
  private[binaryfile] val MODIFICATION_TIME = "modificationTime"
  private[binaryfile] val LENGTH = "length"
  private[binaryfile] val CONTENT = "content"
  private[binaryfile] val BINARY_FILE = "binaryFile"

  val schema = StructType(
    StructField(PATH, StringType, false) ::
    StructField(MODIFICATION_TIME, TimestampType, false) ::
    StructField(LENGTH, LongType, false) ::
    StructField(CONTENT, BinaryType, true) :: Nil)

Except the path and content, all these fields can be used in the predicate pushdown, so to filter out the files at the data source level. The FileFormat interface's buildReader method has a parameter called filters: Seq[Filter] representing "a set of filters than can optionally be used to reduce the number of rows output". In the case of binary data source, these filters are physically resolved in BinaryFileFormat's companion object:

  private[binaryfile] def createFilterFunction(filter: Filter): FileStatus => Boolean = {
    filter match {
      case And(left, right) =>
        s => createFilterFunction(left)(s) && createFilterFunction(right)(s)
      case Or(left, right) =>
        s => createFilterFunction(left)(s) || createFilterFunction(right)(s)
      case Not(child) =>
        s => !createFilterFunction(child)(s)
// ... cases for LENGTH and MODIFICATION_TIME columns
      case _ => (_ => true)

Let's see now a very simple example of the binary data source that will read an Apache Spark logo:

  "an Apache Spark logo" should "be correctly read with a binary data source" in {
    val path = getClass.getResource("/binary_files").getPath
    val sparkLogo = sparkSession.read.format("binaryFile")
        .load(path).as[SparkLogoFields]

    val files = sparkLogo.collect()
    files should have size 1
    val logo = files(0)
    logo.path should endWith("binary_files/apache_spark_logo.png")
    logo.modificationTime shouldNot be(null)
    logo.length shouldEqual 67362
    logo.content shouldEqual FileUtils.readFileToByteArray(new File(s"${path}/apache_spark_logo.png"))
  }

case class SparkLogoFields(path: String, modificationTime: Timestamp, length: BigInt, content: Array[Byte])

You can also control the maximal allowed size of the binary file by changing the spark.sql.sources.binaryFile.maxLength property.

Data reading

The data is read with the org.apache.hadoop.fs.FileSystem implementation corresponding to the file system storing given file. The returned element is an instance of FSDataInputStream and is entirely written to the content column:

            val stream = fs.open(status.getPath)
            try {
              writer.write(i, ByteStreams.toByteArray(stream))
            } finally {
              Closeables.close(stream, true)
            }

At the end, the stream is closed gracefully, so that the eventual IOException is logged as a warning and not thrown to the main context.

Binary data source is then a new data source available in Apache Spark SQL. One thing to notice though. I'm talking about data source because as of this writing, the data sink is not implemented yet and if you try to use it, you will encounter "Write is not supported for binary file data source" error.