Defining schemas in Apache Spark SQL with builder design pattern

on waitingforcode.com

Defining schemas in Apache Spark SQL with builder design pattern

Schemas are one of the key parts of Apache Spark SQL and its distinction point with old RDD-based API. When we deal with data coming from a structured data source as a relational database or schema-based file formats, we can let the framework to resolve the schema for us. But the things complicate when we're working with semi-structured data as JSON and we must define the schema by hand.

In this post divided into 2 parts where we'll see 2 approaches to deal with the schemas. The first section presents an imaginary schema defined with the classical solution. The second one shows an alternative proposal with the use of builder design pattern.

Defining complex schema by hand

Working with schemas is not hard even when we need to define them by hand. However, there are different schemas. For some cases, their manual definition will be easy and we won't ever see the time passed to write it. For some other cases, it may be harder. It's especially true when the schema has a lot of nested levels with dozens of fields at each of them. A complicated schema may look just like that:

{
  "title": "Scala for the Impatient",
   "authors": [
      {"first_name": "Cay S.", "last_name": "Horstmann"}
    ], 
   "comments": [
     {"content": "OK", "note": 5, "author": 
        {"login": "XYZ"}
     }
   ], 
  "metadata": {
     "tags": ["tag1", "tag2", "tag3"],
     "categories": [
     {
        "name": "category1",
        "books_number": 30
     }
   }
}

I omitted "the dozens" of fields to improve the readability. But you can imagine that our book record has much more fields and its structure is complicated. Obviously, it would be easier to flatten the structure but it's not always possible - especially if our system must deal with some data generated by legacy but still very popular producers.

The Apache Spark's schema for above record could be written like this:

  behavior of "book schema"

  it should "be defined manually" in {
    val bookSchema = StructType(
      Array(
        StructField("title", StringType, false),
        StructField("authors",
          ArrayType(StructType(Array(StructField("first_name", StringType, false),
            StructField("last_name", StringType, false))), false), false
        ),
        StructField("metadata", StructType(Array(
          StructField("tags", ArrayType(StringType, false), true),
          StructField("categories", StructType(Array(
            StructField("name", StringType, false), StructField("books_number", IntegerType, false)
          )), true)
        )), true)
      )
    )

    bookSchema.toDDL shouldEqual "`title` STRING,`authors` ARRAY<STRUCT<`first_name`: STRING, `last_name`: STRING>>," +
      "`metadata` STRUCT<`tags`: ARRAY<STRING>, `categories`: STRUCT<`name`: STRING, `books_number`: INT>>"
  }

The schema could also be resolved automatically by the framework but let's recall that we suppose here that the data is not homogenous and by resolving it automatically we can lose some information.

Defining complex schema with builder

The definition with classical StructField manual declarations is fine. The only thing I don't like very much is the verbosity of the parameters in the case of nesting. With a poor indentation some commits later the schema may become unreadable. I don't have a magical solution for that but simply an alternative that maybe you will adapt to your needs. This alternative uses builder design pattern, and hence fluent interfaces. Thanks to that we can easily chain the declaration and with an appropriate more meaningful syntax, make it more readable than the original static definition. Once again, the original definition is fine and it fits in most of the cases, but it's good to have the alternatives.

Aside from the builder design pattern, the alternative discussed here uses currying in Scala. As you could see in the snippet from the previous section, the declaration of fields is very similar. Unless you use some complex data structures as structs or arrays, it always consists on a name, type and nullable flag. If you want, you can also define the metadata but for the sake of simplicity, I omit it here.

The schema builder using both concepts looks like that:

class StructBuilder() {

  private var structFields = Array.empty[StructField]

  def withRequiredFields = withFields(false) _

  def withOptionalFields = withFields(true) _

  private def withFields(nullable: Boolean)(creators: Map[String, (String, Boolean) => StructField]) = {
    val mappedFields = creators.map {
      case (fieldName, fieldGenerator) => fieldGenerator(fieldName, nullable)
    }
    structFields = mappedFields.toArray ++ structFields
    this
  }

  def build: Array[StructField] = structFields

  def buildSchema: StructType = StructType(structFields)

  def toField = fields.struct(structFields)
}

object fields {
  private def baseField(fieldType: DataType)(fieldName: String, nullable: Boolean) =
    new StructField(fieldName, fieldType, nullable)
  def string = baseField(StringType) _
  def array(fieldType: DataType, nullableContent: Boolean) = baseField(ArrayType(fieldType, nullableContent)) _
  def struct(fields: Array[StructField]) = baseField(StructType(fields)) _
  def long = baseField(LongType) _
  def integer = baseField(IntegerType) _
  def double = baseField(DoubleType) _

  def newStruct = new StructBuilder()

}

As promised, we use the fluent interface and curried functions to reduce the declarations redundancy. And more concretely, we can now create our schema with the following:

import com.waitingforcode.sql.fields.{array, integer, newStruct, string}
  it should "be defined with an alternative builder" in {
    val schema = new StructBuilder()
      .withRequiredFields(Map(
        "title" -> string, "authors" -> array(
          newStruct.withOptionalFields(Map("first_name" -> string, "last_name" -> string)).buildSchema, false
        )
      ))
      .withOptionalFields(Map(
        "metadata" -> newStruct.withRequiredFields(Map(
          "tags" -> array(StringType, false),
          "categories" -> newStruct.withOptionalFields(Map("name" -> string, "books_number" -> integer)).toField
        )).toField
      )).buildSchema

    schema.toDDL shouldEqual "`metadata` STRUCT<`tags`: ARRAY<STRING>, `categories`: STRUCT<`name`: STRING, " +
      "`books_number`: INT>>,`title` STRING,`authors` ARRAY<STRUCT<`first_name`: STRING, `last_name`: STRING>>"
  }

As you can see, the declaration is different. Not better, not worse, but different. Here we can notice that the most important accent is put on field names and the part about the types (except arrays) and their nullability are hidden in builder and helpers methods.

Even though you will probably never need to write a builder to create schemas in Apache Spark SQL, the knowledge of this design pattern with a concrete use case in mind can be useful in the daily programming life. As you could see in this short post, the builder lets us hide some redundant declaration and put the code reading accent on something else. In our case, that's the composition of the structure. Thanks to the builder's context, we can easily figure out what fields are required and what are not, and of what types they are.

Share, like or comment this post on Twitter:

Share on: