Apache Spark SQL and unit tests

Versions: Apache Spark 2.4.0

Some time ago I was involved in a discussion about testing Apache Spark SQL code. In this post, I would like to share my observations about this topic.

I will develop these observations in 5 sections. Let's begin with the first of them talking about some golden rule.

Think isolation

To write testable code, think isolation. If you put all your Apache Spark application in a single method, it will be hard to test. You will need not only to create a complete dataset to cover all the possible use cases but also you will need to use the same input as for the runtime. And sometimes it will be complicated to set up all these elements for small unit tests.

For instance, the following snippet will be hard to test:

object TestClass {

  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession =  SparkSession.builder().appName("Spark SQL JDBC options")
      .master("local[*]").getOrCreate()

    val visitors = sparkSession.read.format("jdbc").option("url", "jdbc://")
      .option("dbtable", "visitors")
      .option("user", "root")
      .option("password", "root")
      .load()
    
    visitors.filter("is_confirmed = TRUE")
      .filter("inscription_age > 10")
      .map(row => {
        val rowLogin = row.getAs[String]("login")
        val login = if (rowLogin.startsWith(".")) {
          rowLogin.substring(1)
        } else {
          rowLogin
        }
        CustomUser(login, row.getAs[Int]("inscription_age"))
      }).write.json("/tmp/my_files")
  }
 
}


case class CustomUser(login: String, inscriptionAge: Int)

As you can see, to test the logic you must:

That's a lot of steps to do and IMO, they will discourage you from writing the tests. On the other side, if you isolate the steps of your pipeline, you will be able to test them much easier - even though not all of them must be tested (see the last section):

object Filters {

  def isConfirmed(dataset: DataFrame): DataFrame = {
    dataset.filter("is_confirmed = TRUE")
  }

  def isMajorSubscriber(dataset: DataFrame): DataFrame = {
    dataset.filter("inscription_age > 10")
  }

}

object Mappers {

  def mapRawUserToCustomUser(row: Row): CustomUser = {
    val rowLogin = row.getAs[String]("login")
    val login = if (rowLogin.startsWith(".")) {
      rowLogin.substring(1)
    } else {
      rowLogin
    }
    CustomUser(login, row.getAs[Int]("inscription_age"))
  }

}

object AppLogic {

  def createCustomUsers(inputDataset: DataFrame): Dataset[CustomUser] = {
    Filters.isConfirmed(Filters.isMajorSubscriber(inputDataset)).map(row => Mappers.mapRawUserToCustomUser(row))
  }

}


object TestClassRefactored {

  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession =  SparkSession.builder().appName("Spark SQL JDBC options")
      .master("local[*]").getOrCreate()

    val visitors = sparkSession.read.format("jdbc").option("url", "jdbc://")
      .option("dbtable", "visitors")
      .option("user", "root")
      .option("password", "root")
      .load()

    val customUsers = AppLogic.createCustomUsers(visitors)
    
    customUsers.write.json("/tmp/my_files")
  }

}

As you can see, with that separation you can test every component of the pipeline, starting from simple filters and maps and terminating on combined logic. And you do not need to setup a database since the input dataset can be constructed in any way you want.

Write the code

Very often the volume of test code will be twice or more than the volume of your runtime code. The tests won't only contain testing logic but also will have some extra components like helper methods to create a dataset or to write assertions less painfully. And you should not be scared of that. Unit tests layer is another layer of your application. So do not hesitate to write the code to make your life easier. For instance, if you need to create a test dataset in multiple places, you can either create a simple DSL or use helper methods to do it smoothly. If you are using repetitive assertions, you can use fluent interfaces to create more convenient way to define them.

Also, like for your application logic, when you write your testing logic, focus only on essentials parts. Do not waste your time on writing the code that is "ignored" by the application logic. For instance, if you convert a Dataset into another object and one of its fields is simply passed to the new object without any transformation, maybe it doesn't worth to expose this field as a variable when you create your testing dataset. Maybe it's better to hardcode it and change only when your application logic will do something more with it ?

If you use a helper method to create the rows of your tested Dataset, you can also add default values for parameters. It will avoid a lot of boilerplate code and keep it open to changes if needed. You can find an example in the following snippet with country field:

object TestHelpers {

  def createUser(id: Int, login: String, country: String = "Unknown"): CustomUser = CustomUser(id, login, country)

}


case class CustomUser(id: Int, login: String, country: String)

With the method like this you can, if you really want, override the country field. But as told, for most of the cases, you won't do that and will rather focus on the properties impacting your processing logic, like id or login.

Share the code

The code you're writing may be useful for your colleagues. For instance, if you created a generic DSL to build test datasets, it may be a good reason to expose it as a dependency. It can be a way to accelerate the development and to avoid problems already solved by the others. The dependency can be then defined with "test" scope and therefore do not impact runtime code.

Code smells

Be attentive when you write your testing logic. Everytime you need to do something unusual, like mocking a Scala's object, mocking Apache Spark's reader or writer, or writing a test method on 100 lines, you should wake up and ask yourself whether it's normal.

Maybe instead of mocking an object harcoded inside a method, you should prefer to pass the object in parameter to gain more flexibility in your tests? Maybe instead of mocking a Spark's class, you should prefer to pass whatever you need as a parameter or inject it during the instance's construction? Rather on duplicating your testing logic and have test methods on 100 lines, maybe it's better to create some helper objects and simplify the maintainability?

There are only a few of code smells you should be careful about. As a rule of thumb, you should always stop when you think to make something strange and search whether what's you are doing is normal or not.

Apache Spark code was already tested

Remember that the idea of unit testing is not to test everything. A good example of the code that doesn't need (can't ?) to be tested is the logic of a framework. For instance, if you create an Apache Kafka topic reader, you don't need to test whether you're really reading the data from this topic. It was already tested by Apache Spark committers.

If you really want to test the parts provided by the framework, you can add a layer with integration tests. It will not only let you to validate the input or output parts but also the global behavior of the application.

Writing code is an art and unit tests are an intrinsic part of good software. Not only they help to document the behavior of the application's logic but also help to detect any regressions earlier. Unfortunately, there is no one size fits all solution because every data pipeline is different. Fortunately, with some best practices rules you should be able to define your testing logic easier and, with practice, write it faster.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!