Apache Avro and Apache Spark compatibility

on waitingforcode.com

Apache Avro and Apache Spark compatibility

I'm very happy when the readers comment on my posts or tweets. A lot of such discussions are the topics of posts. It's the case of this one where I try to figure out whether Apache Spark SQL Avro source is compatible with other applications using this serialization format.

To analyze this question, I will use the following Apache Avro schema:

{
  "namespace": "com.waitingforcode.sql.avro",
  "type": "record", "name": "Order",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "creationDate", "type": {"type": "string", "logicalType": "timestamp-millis"}},
    {"name": "promotionalCode", "type": ["null", "int"], "aliases" : ["promoCode"]},
    {"name": "products", "type": {"type": "array", "items": {
      "name": "Product", "type": "record", "fields": [
        {"name": "id", "type": "long"},
        {"name": "name", "type": "string"},
        {"name": "unitaryPrice", "type": "double"},
        {"name": "productsInBasket", "type": "int"}
      ]
    }}},
    {"name": "extraInfo", "type": {"type": "map", "values": ["null", "string"]}},
    {"name": "customer", "type": {
      "type": "record", "name": "Customer", "fields": [
        {"name": "customerType", "type": {"type": "enum", "name": "CustomerTypes", "symbols": ["BUSINESS", "PRIVATE"]}},
        {"name": "address", "type": {
          "name": "Address", "type": "record", "fields": [
            {"name": "postalCode", "type": "int"}, {"name": "city", "type": "string"}
          ]
        }}
      ]
    }}
  ]
}

As you can see, the schema represents an order entity. During its conception, I tried to cover the most of features, like nullable fields, aliases, logical and complex types. They're all mapped to simple Java POJO classes whose definition you can find in this Github package. The goal is to simplify test assertions and also check whether records defined from Scala classes can be processed by Java reader.

In the next 4 sections I will provide several examples to prove Avro - Spark compatibility or incompatibility.

Avro to Spark, Spark to Avro

In the first test I verified whether files can be easily written by one producer and read by a different consumer. The part from Avro to Spark was easy and I implemented it without problems:

  private val avroSchema = new Schema.Parser().parse(new File(schemaPath))

  it should "read files produced by Avro Java writer" in {
    val order1 = Order.valueOf(100L, "2019-05-10T05:55:00.093Z", null, util.Arrays.asList(
      Product.valueOf(1L, "milk", 0.99d, 1)
    ), new util.HashMap[String, String](), new Customer(CustomerTypes.BUSINESS, new Customer.Address(33000, "Bordeaux")))
    val extraInfo2 = new util.HashMap[String, String]()
    extraInfo2.put("first_order", "true")
    extraInfo2.put("bonus_points", "30")
    val order2 = Order.valueOf(101L, "2019-05-10T15:55:00.093Z", 123, util.Arrays.asList(
      Product.valueOf(1L, "milk", 0.99d, 2), Product.valueOf(2L, "coffee", 3.99d, 4),
      Product.valueOf(3L, "chocolate", 1.99d, 2)
    ), extraInfo2, new Customer(CustomerTypes.PRIVATE, new Customer.Address(75001, "Paris")))
    val datumWriter = new ReflectDatumWriter[Order](avroSchema)
    val dataFileWriter = new DataFileWriter[Order](datumWriter)
    val inputFilePath = s"${baseDir}/test_avro_writer_spark_reader.avro"
    new File(inputFilePath).delete()
    dataFileWriter.create(avroSchema, new File(inputFilePath))
    Seq(order1, order2).foreach(order => dataFileWriter.append(order))
    dataFileWriter.close()

    val orders = TestedSparkSession.read.format("avro").load(inputFilePath)
      .map(row => row.mkString("; "))
      .collect()

    orders should have size 2
    orders should contain allOf("100; 2019-05-10T05:55:00.093Z; null; WrappedArray([1,milk,0.99,1]); " +
      "Map(); [BUSINESS,[33000,Bordeaux]]",
    "101; 2019-05-10T15:55:00.093Z; 123; WrappedArray([1,milk,0.99,2], [2,coffee,3.99,4], [3,chocolate,1.99,2]); " +
      "Map(first_order -> true, bonus_points -> 30); [PRIVATE,[75001,Paris]]")
  }

Nothing special to notice here. Avro files are created the usual way and Apache Spark reader has no problems to deal with them. But the opposite is quite different. Let's start with a case that doesn't work and analyze it throughout next sections:

  "first Spark to Avro test" should "fail when reading created Avro files" in {
    val ordersDataset = Seq(
      ScalaOrder(102L, "2019-05-10T13:55:00.093Z", None, Seq(
        ScalaProduct(1L, "milk", 0.99d, 2), ScalaProduct(2L, "coffee", 3.99d, 4)
      ), Map.empty, ScalaCustomer("PRIVATE", ScalaCustomerAddress(39000, "xx"))),
      ScalaOrder(101L, "2019-05-10T15:55:00.093Z" , Some(123), Seq(
        ScalaProduct(1L, "milk", 0.99d, 2), ScalaProduct(2L, "coffee", 3.99d, 4)
      ), Map.empty, ScalaCustomer("PRIVATE", ScalaCustomerAddress(39000, "xx")))
    ).toDF
    val outputDir = s"${baseDir}/spark_producer_failure_1"

    ordersDataset.repartition(1).write.format("avro").mode(SaveMode.Overwrite)
      .save(outputDir)

    intercept[AvroTypeException] {
      new File(outputDir).listFiles.toSeq.filter(file => file.getName.endsWith(".avro"))
        .foreach(avroFile => {
          val genericReader = new GenericDatumReader[GenericData.Record](avroSchema)
          val dataFileReader = DataFileReader.openReader(avroFile, genericReader)
          // Consume just one record to see if it works
          dataFileReader.next()
        })
    }
  }

case class ScalaOrder(id: Long, creationDate: String, promotionalCode: Option[Int], products: Seq[ScalaProduct],
                      extraInfo: Map[String, String], customer: ScalaCustomer)
case class ScalaProduct(id: Long, name: String, unitaryPrice: Double, productsInBasket: Int)
case class ScalaCustomer(customerType: String, address: ScalaCustomerAddress)
case class ScalaCustomerAddress(postalCode: Int, city: String)

Problem 1: enums

The last snippet doesn't work because of this exception:

Found string, expecting com.waitingforcode.sql.avro.CustomerTypes
org.apache.avro.AvroTypeException: Found string, expecting com.waitingforcode.sql.avro.CustomerTypes
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readEnum(ResolvingDecoder.java:254)
    at org.apache.avro.generic.GenericDatumReader.readEnum(GenericDatumReader.java:238)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:176)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)

If you analyze the stack trace, you will see that Avro reader considers one of the fields as an enum, exactly as we specified it in the schema, whereas Spark generates it as a string. And there is nothing strange since Apache Spark knows nothing about enums. It simply converts its internal types to corresponding Apache Avro ones:

// AvroSerializer
  private def newConverter(catalystType: DataType, avroType: Schema): Converter = {
    (catalystType, avroType.getType) match {
      case (NullType, NULL) =>
        (getter, ordinal) => null
      case (BooleanType, BOOLEAN) =>
        (getter, ordinal) => getter.getBoolean(ordinal)
      case (ByteType, INT) =>
        (getter, ordinal) => getter.getByte(ordinal).toInt
// ...

And Avro schema is inferred directly from Dataframe schema by Apache Spark in AvroFileFormat's prepareWrite method:

  override def prepareWrite(
      spark: SparkSession,
      job: Job,
      options: Map[String, String],
      dataSchema: StructType): OutputWriterFactory = {
    val parsedOptions = new AvroOptions(options, spark.sessionState.newHadoopConf())

    val outputAvroSchema: Schema = parsedOptions.schema
      .map(new Schema.Parser().parse)
      .getOrElse(SchemaConverters.toAvroType(dataSchema, nullable = false,
        parsedOptions.recordName, parsedOptions.recordNamespace))

If you take a look at SchemaConverters#toAvroType( catalystType: DataType, nullable: Boolean = false, recordName: String = "topLevelRecord", nameSpace: String = "") method, you will see that Apache Spark never converts one of its columns into Avro's enum type (and that's logical, Spark doesn't have an enum type!).

After seeing all this code, you can figure out the solution. Yes, to make it work, we must define an explicit Avro schema to avoid Apache Spark conversion to string type:

  "the second Spark to Avro test" should "fail even writing files with schema" in {
    val ordersDataset = Seq(
      ScalaOrder(102L, "2019-05-10T13:55:00.093Z", None, Seq(
        ScalaProduct(1L, "milk", 0.99d, 2), ScalaProduct(2L, "coffee", 3.99d, 4)
      ), Map.empty, ScalaCustomer("PRIVATE", ScalaCustomerAddress(39000, "xx"))),
      ScalaOrder(101L, "2019-05-10T15:55:00.093Z" , Some(123), Seq(
        ScalaProduct(1L, "milk", 0.99d, 2), ScalaProduct(2L, "coffee", 3.99d, 4)
      ), Map.empty, ScalaCustomer("PRIVATE", ScalaCustomerAddress(39000, "xx")))
    ).toDF
    val outputDir = s"${baseDir}/spark_producer_failure_2"

    intercept[SparkException] {
      ordersDataset.repartition(1).write.format("avro").option("avroSchema", schemaJson)
        .mode(SaveMode.Overwrite)
        .save(outputDir)
    }
  }

But as you can see in the test, it's still not working. Why?

Problem 2: nullability

This time the exact exception was:

Caused by: org.apache.avro.AvroRuntimeException: Not a union: {"type":"string","logicalType":"timestamp-millis"}
    at org.apache.avro.Schema.getTypes(Schema.java:299)
    at org.apache.spark.sql.avro.AvroSerializer.org$apache$spark$sql$avro$AvroSerializer$$resolveNullableType(AvroSerializer.scala:229)
    at org.apache.spark.sql.avro.AvroSerializer$$anonfun$3.apply(AvroSerializer.scala:209)
    at org.apache.spark.sql.avro.AvroSerializer$$anonfun$3.apply(AvroSerializer.scala:208)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:296)
    at org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:208)
    at org.apache.spark.sql.avro.AvroSerializer.(AvroSerializer.scala:51)
    at org.apache.spark.sql.avro.AvroOutputWriter.serializer$lzycompute(AvroOutputWriter.scala:42)

The error says that we're trying to write a nullable field into a not nullable field definition. As you can see in the message, it happens for creationDate attribute. It looks like both, Spark and Avro, schemas are incompatible now, right? Let's take a look at the schema produced by Apache Spark:

# ordersDataset.printSchema()
root
 |-- id: long (nullable = false)
 |-- creationDate: string (nullable = true)
 |-- promotionalCode: integer (nullable = true)
 |-- products: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = false)
 |    |    |-- name: string (nullable = true)
 |    |    |-- unitaryPrice: double (nullable = false)
 |    |    |-- productsInBasket: integer (nullable = false)
 |-- extraInfo: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- customer: struct (nullable = true)
 |    |-- customerType: string (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- postalCode: integer (nullable = false)
 |    |    |-- city: string (nullable = true)

Indeed, the schema resolved by Apache Spark contains plenty of nullable attributes. You can check nullability rules for in-memory datasets in org.apache.spark.sql.catalyst.ScalaReflection#schemaFor(tpe: `Type`) but generally, all non primitive types will be marked as nullable. So, to fix the issue, we enforce the schema:

  "the third Spark to Avro test" should "work with explicit schema" in {
    val ordersDataset = Seq(
      ScalaOrder(102L, "2019-05-10T13:55:00.093Z", None, Seq(
        ScalaProduct(1L, "milk", 0.99d, 2), ScalaProduct(2L, "coffee", 3.99d, 4)
      ), Map.empty, ScalaCustomer("PRIVATE", ScalaCustomerAddress(11000, "xx"))),
      ScalaOrder(101L, "2019-05-10T15:55:00.093Z" , Some(123), Seq(
        ScalaProduct(1L, "milk", 0.99d, 1), ScalaProduct(2L, "coffee", 3.99d, 4)
      ), Map.empty, ScalaCustomer("PRIVATE", ScalaCustomerAddress(39000, "xx")))
    ).toDF
    val outputDir = s"${baseDir}/spark_producer_success_with_schema"
    val ordersWithExplicitSchema = ordersDataset.sparkSession.createDataFrame(ordersDataset.rdd, orderSchema)

    ordersWithExplicitSchema.repartition(1).write.format("avro")
      .option("avroSchema", schemaJson).mode(SaveMode.Overwrite)
      .save(outputDir)

    val records = new File(outputDir).listFiles.toSeq.filter(file => file.getName.endsWith(".avro"))
      .flatMap(avroFile => {
        val genericReader = new GenericDatumReader[GenericData.Record](avroSchema)
        val dataFileReader = DataFileReader.openReader(avroFile, genericReader)
        import scala.collection.JavaConverters._
        dataFileReader.iterator().asScala
      })

    records should have size 2
    records.map(genericRecord => genericRecord.get("id")) should contain allOf(101, 102)
    records.map(genericRecord => genericRecord.get("creationDate").toString) should contain allOf(
      "2019-05-10T13:55:00.093Z", "2019-05-10T15:55:00.093Z"
    )
    records.map(genericRecord => genericRecord.get("promotionalCode")) should contain allOf(null, 123)
    records.map(genericRecord => genericRecord.get("products").toString) should contain allOf(
      "[{\"id\": 1, \"name\": \"milk\", \"unitaryPrice\": 0.99, \"productsInBasket\": 2}, {\"id\": 2, \"name\": \"coffee\", \"unitaryPrice\": 3.99, \"productsInBasket\": 4}]",
      "[{\"id\": 1, \"name\": \"milk\", \"unitaryPrice\": 0.99, \"productsInBasket\": 1}, {\"id\": 2, \"name\": \"coffee\", \"unitaryPrice\": 3.99, \"productsInBasket\": 4}]"
    )
    records.map(genericRecord => genericRecord.get("extraInfo").toString) should contain only("{}")
    records.map(genericRecord => genericRecord.get("customer").toString) should contain allOf(
      "{\"customerType\": \"PRIVATE\", \"address\": {\"postalCode\": 11000, \"city\": \"xx\"}}",
      "{\"customerType\": \"PRIVATE\", \"address\": {\"postalCode\": 39000, \"city\": \"xx\"}}"
    )
  }

As you can see through the assertions, it worked. But you must know that it won't cover all use cases. Let's cover it in the next section.

Convert to Avro

The examples you saw before are working on one axis Avro - Spark. But what happens if we try to convert a dataset from one format to Avro? I did a try with JSON → Avro conversion:

  "Avro conversion" should "fail for JSON file" in {
    val extraOrders =
      """
        |{"id": 200, "creationDate": "2019-04-10T19:55:00.093Z", "promotionalCode": null, "products": [{"id": 5, "name": "water", "unitaryPrice": 0.49, "productsInBasket": 3}, {"id": 6, "name": "sugar", "unitaryPrice": 0.99, "productsInBasket": 1}], "extraInfo": {"add_notice": "true"}, "customer": {"customerType": "PRIVATE", "address": {"postalCode": 57000, "city": "Metz"}}}
        |{"id": 201, "creationDate": "2019-04-10T11:55:00.093Z", "promotionalCode": null, "products": [{"id": 5, "name": "water", "unitaryPrice": 0.49, "productsInBasket": 3}, {"id": 6, "name": "sugar", "unitaryPrice": 0.99, "productsInBasket": 1}], "extraInfo": {"add_notice": "true"}, "customer": {"customerType": "PRIVATE", "address": {"postalCode": 57000, "city": "Metz"}}}
      """.stripMargin
    val ordersToAddFile = s"${baseDir}/extra_avro_data.json"
    FileUtils.writeStringToFile(new File(ordersToAddFile), extraOrders, false)
    val ordersFromJson = TestedSparkSession.read.schema(orderSchema)
      .option("mode", "permissive")
      .json(ordersToAddFile)

    ordersFromJson.printSchema()
    ordersFromJson.show()

    intercept[SparkException] {
      ordersFromJson.repartition(1).write.option("avroSchema", schemaJson)
        .format("avro").mode(SaveMode.Overwrite)
        .save(s"${baseDir}/failure_json")
    }
  }

Here too, the conversion fails because of:

Caused by: org.apache.avro.AvroRuntimeException: Not a union: "long"
    at org.apache.avro.Schema.getTypes(Schema.java:299)
    at org.apache.spark.sql.avro.AvroSerializer.org$apache$spark$sql$avro$AvroSerializer$$resolveNullableType(AvroSerializer.scala:229)
    at org.apache.spark.sql.avro.AvroSerializer$$anonfun$3.apply(AvroSerializer.scala:209)
    at org.apache.spark.sql.avro.AvroSerializer$$anonfun$3.apply(AvroSerializer.scala:208)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:392)

Strange? Indeed, after all we defined the schema for JSON and for Avro writer and both are consistent. Unfortunately, the nullability of JSON schema is not respected:

# ordersFromJson.printSchema()
root
 |-- id: long (nullable = true)
 |-- creationDate: string (nullable = true)
 |-- promotionalCode: integer (nullable = true)
 |-- products: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- unitaryPrice: double (nullable = true)
 |    |    |-- productsInBasket: integer (nullable = true)
 |-- extraInfo: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- customer: struct (nullable = true)
 |    |-- customerType: string (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- postalCode: integer (nullable = true)
 |    |    |-- city: string (nullable = true)

How is it possible? To answer this question, we have to go to org.apache.spark.sql.execution.datasources.DataSource#resolveRelation(checkFilesExist: Boolean = true) method, and more exactly to the line number 377:

        HadoopFsRelation(
          fileCatalog,
          partitionSchema = partitionSchema,
          dataSchema = dataSchema.asNullable,
          bucketSpec = bucketSpec,
          format,
          caseInsensitiveOptions)(sparkSession)

// StructType
  override private[spark] def asNullable: StructType = {
    val newFields = fields.map {
      case StructField(name, dataType, nullable, metadata) =>
        StructField(name, dataType.asNullable, nullable = true, metadata)
    }

    StructType(newFields)
  }

As you can see there, the schema is created with all nullable fields. Hopefully, this rule is defined only for non streaming file-based sources.

A lot of problems described in this post are natural. If you define a schema 'A' for Apache Spark dataset and want to write in a subtle different schema 'B' of Apache Avro, it's good that the framework doesn't let you do so. On the other side, there are still some difficult points, like the last one when the input schema is modified by Apache Spark.

If you liked it, you should read: Apache Spark 2.4.0 features - Avro data source

Share, like or comment this post on Twitter:

Share on: