Regression tests with Apache Spark SQL joins

on waitingforcode.com

Regression tests with Apache Spark SQL joins

Regressions are one of the risks of our profession. Fortunately, we can limit the risk thanks to different testing strategies. One of them are regression tests that we can use to check whether the modified data processing logic didn't introduce the regressions simply by comparing two datasets.

In this post I will show a simple implementation of regression tests applied on JSON files generated with Apache Spark SQL. The first part will show different testing approaches that we can use in the context of data. The second section will focus on one of them using SQL JOIN's.

Regression tests in data context

The primary goal of regression tests is to ensure that the previously developed feature doesn't break after code changes. Regression tests are at a higher level than unit tests which should remain the main testing strategy of every software project, including the data ones. Regression tests should be rather considered as their complements.

Regression test in data-centric systems can be a simple comparison of 2 datasets. The first dataset is considered as the reference dataset because it was generated by currently running, and therefore, validated previously, version of the system. The second one is constructed by the release candidate version.

To build your regression testing system you can use different strategies:

  • build a test dataset - it's a quite common approach. It uses the dataset built specifically for the test purposes. This static dataset guarantees that the result of the tests will be predictable and that's something we always want in testing. On the other side, the real data may evolve and so independently on our data department. For instance, some external producers can add extra fields and do not prevent us - unfortunately, it happens, and the communication between the teams is often a bigger problem than the technical issues.

    Applying the regression tests on not changed dataset will make them pass and only when the new version will be deployed on production you will see that these new fields cause some problems. Aside from the static character of the dataset, another problem may come from the maintenance. If you don't have any help of other people, creating and maintaining a dataset may be a hard task.
  • compare with real-time data - in this approach the reference dataset comes directly from the data processed by your application. It guarantees a better freshness of the data and avoids input modifications risk. It's also less painful to maintain.

    But on the other side, this solution may bring some false-negatives, especially if you encounter some data issues (I bet you are) like late data. To illustrate that, let's take the case of a batch job triggered at hourly basic. The job reads the data stored on some distributed file system, processes it and writes to another place on the same file system. If you have data issues, you cannot guarantee that the input used to the real-time generation will be the same as the input used for the regression tests. Because of that, you can get few false-negative results that may make the interpretation of results harder.
  • generate the datasets with 2 versions of the application - here instead of a static comparison with the already generated dataset, we will take some real input data and process it with the previous and new versions of the application. It simplifies the workflow because you don't need to copy the data but on the other hand, it will require to deploy and process the data twice which will slow down the overall process and therefore, give you the feedback later. And in the tests, sooner you get the feedback, more reactive you are. To mitigate that latency issue, you can always process the data simultaneously but it may complexify your testing logic.

Among the discussed solutions, regression tests applied on the real data are not only good because of the increased correctness but also because they can help to compare the execution time of both versions of code.

But as you can see, there is no one-fits-all solution. It's up to you and your team to take the advantages and disadvantages, and apply them to your application context. For the illustration purposes of this post I will opt for the solution using dataset from production.

JOIN-based regression tests

To represent regression tests in the code I will use the SQL's JOINs. This operator comes quite naturally to mind if we consider regression tests as a comparison between 2 datasets. In the following paragraphs you can find the different types of JOINs and their purpose in the regression tests context.

The first JOIN type that by the way covers almost all needs I'm testing, is LEFT JOIN where the left side is the reference dataset. With the help of LEFT JOIN you will be able to detect missing records in the tested dataset and also make some fine-grained checks on the data. Depending on your use case, you can later export invalid records to another place in order to make some ad-hoc querying, or simply count them to get the error ratio:

  val sparkSession: SparkSession = SparkSession.builder()
    .appName("Spark SQL JOIN-based regression test")
    .master("local[*]").getOrCreate()
  import sparkSession.implicits._

  private val referenceDataset = Seq(
    RegressionTestOrder(1L, 39.99d, Set(1L, 2L)),
    RegressionTestOrder(2L, 41.25d, Set(1L)),
    RegressionTestOrder(3L, 100d, Set(1L, 2L, 3L)),
    RegressionTestOrder(4L, 120d, Set(1L))
  ).toDF.cache() // cache it in order to avoid the recomputation
  private val generatedDataset = Seq(
    RegressionTestOrder(1L, 39.99d, Set(1L, 2L)),
    RegressionTestOrder(2L, 41.25d, Set.empty),
    RegressionTestOrder(3L, 200d, Set(1L, 2L, 3L)),
    RegressionTestOrder(100L, 200d, Set(1L, 2L, 3L))
  ).toDF.select($"id".as("generated_id"), $"amount".as("generated_amount"), $"itemIds".as("generated_itemIds")).cache()


  "LEFT JOIN with a custom comparator" should "be used to detect missing data" in {
    val allReferenceDataWithOptionalMatches =
      referenceDataset.join(generatedDataset, referenceDataset("id") === generatedDataset("generated_id"), "left")

    val notGeneratedReferenceData = allReferenceDataWithOptionalMatches.filter(row => row.getAs[Long]("generated_id") == null)
      .count()
    val commonEntries = allReferenceDataWithOptionalMatches.filter(row => row.getAs[Long]("generated_id") != null)
    commonEntries.cache()
    val invalidAmountGeneratedData = commonEntries
      .filter(row => row.getAs[Double]("generated_amount") != row.getAs[Double]("amount"))
    val invalidItemIdsGeneratedData = commonEntries
      .filter(row => row.getAs[Set[Long]]("generated_itemIds") != row.getAs[Set[Long]]("itemIds"))

    // Please notice that I'm using the .count() as an action but you can use any other valid action, like materializing
    // not matching data in order to investigate the inconsistencies later.
    notGeneratedReferenceData shouldEqual 1
    invalidAmountGeneratedData.count() shouldEqual 1
    invalidItemIdsGeneratedData.count() shouldEqual 1
  }

case class RegressionTestOrder(id: Long, amount: Double, itemIds: Set[Long])

The second useful type is LEFT ANTI JOIN that can be used to detect the records from the new dataset that are not included in the reference dataset:

  "LEFT ANTI JOIN" should "be used to detect the data missing in the reference dataset" in {
    val extraGeneratedData = generatedDataset
      .join(referenceDataset, referenceDataset("id") === generatedDataset("generated_id"), "leftanti").count()

    extraGeneratedData shouldEqual 1
  }

Of course, you could do both checks with FULL OUTER JOIN which is a valid solution too but requiring a little bit conditional logic in the code:

  "FULL OUTER JOIN" should "be used to detect all errors with a single join" in {
    val allReferenceDataWithOptionalMatches =
      referenceDataset.join(generatedDataset, referenceDataset("id") === generatedDataset("generated_id"), "full_outer")

    val notGeneratedReferenceData = allReferenceDataWithOptionalMatches.filter(row => row.getAs[Long]("generated_id") == null)
      .count()
    val commonEntries = allReferenceDataWithOptionalMatches.filter(row => row.getAs[Long]("id") != null &&
      row.getAs[Long]("generated_id") != null)
    commonEntries.cache()
    val invalidAmountGeneratedData = commonEntries
      .filter(row => row.getAs[Double]("generated_amount") != row.getAs[Double]("amount"))
    val invalidItemIdsGeneratedData = commonEntries
      .filter(row => row.getAs[Set[Long]]("generated_itemIds") != row.getAs[Set[Long]]("itemIds"))
    val extraGeneratedData = allReferenceDataWithOptionalMatches.filter(row => row.getAs[Long]("generated_id") != null &&
      row.getAs[Long]("id") == null).count()

    notGeneratedReferenceData shouldEqual 1
    invalidAmountGeneratedData.count() shouldEqual 1
    invalidItemIdsGeneratedData.count() shouldEqual 1
    extraGeneratedData shouldEqual 1
  }

Regression tests or globally any specific tests applied on the big volumes of data, are not an easy piece of cake. The execution time, the maintenance effort and the complexity may scary at first glance. However, with some simple practices like the ones discussed in this post, all these negatives points can be mitigated. The tested dataset can be dynamic and the complexity of the code can be summarized to the SQL JOINs and different attribute-focus comparison logic.

Share, like or comment this post on Twitter:

Share on: