Apache Spark 2.4.0 features - Avro data source

Versions: Apache Spark 2.4.0 https://github.com/bartosz25/spark-...e/sql/avro/AvroDataSourceTest.scala

Apache Avro became one of the serialization standards, among others because of its use in Apache Kafka's schema registry. Previously to work with Avro files with Apache Spark we needed Databrick's external package. But it's no longer the case starting from 2.4.0 release where Avro became first-class citizen data source.

In this next post about the new Apache Spark features I'll show the integration of Apache Avro with Apache Spark SQL module. The first part briefly recalls the context of the state prior to 2.4.0 release. The second section shows what we can do with Avro files while the last one focuses on some internal details.

Avro data source integration

Prior to Apache Spark 2.4.0, Avro module was an external package maintained by Databricks. However, not every Apache Spark newcomer is aware of the existence of external packages and the lack of Avro support could discourage some of them. The decision about moving the external package into Apache Spark main project was taken regarding these newcomers but also regarding the users of Structured Streaming module where a lot of pipelines are Apache Kafka-based. Just to recall, Apache Kafka uses Apache Avro for the data serialization.

However, the integration task didn't consist only on moving the project but also on adding some missing features, as the support of some Avro data types. It also was an occasion to refactor the code. The final result of that was the integration of Apache Avro data source as an external module. Thanks to that it can be imported as a part of Apache Spark:

libraryDependencies += "org.apache.spark" %% "spark-avro" % "2.4.0"

Avro data source examples

Using Apache Avro in Apache Spark SQL doesn't differ a lot from using any other data sources. To load the data, we can simply call format("avro") method pointing to the files written with Apache Avro:

  behavior of "Avro data source"

  it should "read Apache Avro data not written by Apache Spark SQL" in {
    val orders = TestedSparkSession.read.format("avro").load(file.getAbsolutePath)

    val collectedOrders = orders.collect().map(row => s"${row.getAs[Int]("id")}: ${row.getAs[Double]("amount")}")
    collectedOrders should have size 1
    collectedOrders(0) shouldEqual "1: 39.55"
  }

By default, the module supports reading files without .avro extension. But we can disable this behavior by setting ignoreExtension option to false:

  it should "fail on reading Avro with ignoreExtension disabled" in {
    val fileWithoutExtension = new File("./sample_avro")
    fileWithoutExtension.deleteOnExit()
    fileWithoutExtension.createNewFile()

    val exception = intercept[FileNotFoundException] {
      val orders = TestedSparkSession.read.format("avro").option("ignoreExtension", false).load(fileWithoutExtension.getAbsolutePath)
      orders.show(3)
    }

    exception.getMessage should include("No Avro files found. If files don't have .avro extension, set ignoreExtension to true")
  }

As for any other data sources, Apache Avro integration also supports the schema definition. And unsurprisingly, when the schema mismatches the schema used to write the data, the reading operation fails:

  it should "fail on reading Avro data with mismatching schema" in {
    val mismatchingSchema = OrderSchema.replace("amount", "amount_mismatch")

    val exception = intercept[SparkException] {
      TestedSparkSession.read.format("avro").option("avroSchema", mismatchingSchema).load(fileNoExt.getAbsolutePath).count()
    }

    exception.getMessage should include("Found order, expecting order, missing required field amount_mismatch")
  }

Finally, Apache Avro module supports not only the loading but also the writing of data:

  it should "read the data written by Avro writer" in {
    val savedAvroDataset = new File("./avro_dataset.avro")
    savedAvroDataset.deleteOnExit()
    val orders = Seq((1L, "order#1"), (2L, "order#2")).toDF("id", "label")

    orders.write.format("avro").mode(SaveMode.Overwrite).save(savedAvroDataset.getAbsolutePath)

    val ordersFromWrittenAvro = TestedSparkSession.read.format("avro").load(savedAvroDataset.getAbsolutePath)

    val collectedOrders = ordersFromWrittenAvro.collect().map(row => s"${row.getAs[Long]("id")}: ${row.getAs[String]("label")}")
    collectedOrders should have size 2
    collectedOrders should contain allOf("1: order#1", "2: order#2")
}

During the writing stage we can specify some of write-only options like recordName to indicate Avro's schema name attribute, recordNamespace to specify recordNamespace or compression to define the used compression codec which is one of: uncompressed, snappy, deflate, bzip2, xz, snappy( default). We can also specify the avroSchema option, exactly as for reading:

  it should "write data with customized Avro options" in {
    val savedAvroDataset = new File("./avro_dataset")
    val orders = Seq((1, 20.99d)).toDF("id", "amount")

    orders.write.format("avro").mode(SaveMode.Overwrite).option("recordName", "orderFromSpark")
      .option("recordNamespace", "com.waitingforcode.spark.avro").option("compression", "bzip2")
      .save(savedAvroDataset.getAbsolutePath)

    val dataFile = savedAvroDataset.listFiles().toSeq.find(file => file.getName.startsWith("part")).get
    val datuumReader = new GenericDatumReader[GenericData.Record]()
    val dataFileReader = new DataFileReader[GenericData.Record](dataFile, datuumReader)
    val readOrder = dataFileReader.next()
    readOrder.toString shouldEqual "{\"id\": 1, \"amount\": 20.99}"
    datuumReader.getSchema.getName shouldEqual "orderFromSpark"
    datuumReader.getSchema.getNamespace shouldEqual "com.waitingforcode.spark.avro"
    dataFileReader.getMetaString("avro.codec") shouldEqual "bzip2"
  }

Avro integration internals

The conversion between Apache Spark and Apache Avro data types is implemented in AvroDataToCatalyst and CatalystDataToAvro operators. However, they only define the workflow of the conversion. The conversion itself is really made by 2 other objects: AvroDeserializer and AvroSerializer.

The serialization/deserialization implementation was improved in the 2.4.0 release. Previously it involved 3 transformation steps: Avro format row → Spark's Row → Spark's InternalRow (the opposite direction for serialization). New implementation gets rid of the middle format and works directly on Avro and InternalRow formats.

All logic responsible for translating Avro schemas into Spark's ones is included in org.apache.spark.sql.avro.SchemaConverters object. As often in any similar Scala code, the translation uses pattern matching:

  def toSqlTypeHelper(avroSchema: Schema, existingRecordNames: Set[String]): SchemaType = {
    avroSchema.getType match {
      case INT => avroSchema.getLogicalType match {
        case _: Date => SchemaType(DateType, nullable = false)
        case _ => SchemaType(IntegerType, nullable = false)
      }
      case STRING => SchemaType(StringType, nullable = false)
      case BOOLEAN => SchemaType(BooleanType, nullable = false)
// ...

Integrating Apache Avro data source into Apache Spark core was an important step for all users. It's considered now as a first-class citizen of Apache Spark ecosystem. The integration task was also the moment of some important refactorings as the one related to the serialization/deserialization, actually made without an intermediary step with Row representation.